RDD
1.0 RDD概念
RDD 是弹性分布式数据集,有flatMap等api,也是编程模型。RDD 之间有依赖,且可以分区
分区:RDD和MR可以将自身分区到每一个block,处理每一个block的数据,如同hdfs,文件都被划分为block分布式存储在集群中。RDD和MR都是并行的。
2.0 CODE
- RDD实例化,SparkCore的入口SparkContext
Driver和ClusterManager以及Worker的分布就如同C/S架构,SparkContext是Driver(前端客户端)最核心的组件。
Spark作为大入口,可以设置参数,设置jar包等
2.1 RDD创建
1 |
|
2.2 算子
map,flatMap 同java stream
ReduceByKey 接受二元元祖(k:v)
2.2.1算子分类:
基础数据类型的计算
k:v 计算(这里特指二元元组)(reduceByKey,groupByKey…..)
针对数字类型的操作(max,min….)
2.2.2 转换算子学习
map
flatMap
reduceByKey: 传入二元元组,按照key分组,传递分组的value计算
mapPartitions(并行): 和map的区别,map针对单个数据(如果在其内数据库访问,效率很低),mapPartitions(不让每一条数据执行访问数据库,按照分区访问数据库,效率高)将RDD内的所有分区数据一次传输过去,map的话得每次单条传输过去给执行器
1
2
3
4
5
6
7// 1. mapPartitions
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), 2)
// 一个分区肯定不止一条数据
rdd1.mapPartitions(iter => {
// iter 是scala的数据类型
iter.map(_*10)
}).collect().foreach(println)mapPartitionsWithIndex(并行):
1
2
3
4
5
6val rdd2 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), 2)
// index 是分区号
rdd2.mapPartitionsWithIndex((index,iter)=>{
iter.foreach(x=>println(x+" belong index:"+index))
iter
}).collect()filter
1
2
3// true 就留下
val rdd3 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), 2)
rdd3.filter(_>6).collect().foreach(println)sample:如果数据太大,变为小数据集,随机抽取数据,保证速度
1
2
3sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), 2)
// 参数1:是否有放回(是否能抽到同一个东西),false就是无放回,同一数不能抽取出两次 参数2:采样比例 参数3:种子,一般不指定
.sample(false,0.3).collect().foreach(println)mapValues: 针对二元元组,可以用map代替,但是这个更方便
1
2sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)),2)
.mapValues(_*10).collect().foreach(println)交集 并集 差集:interaction union subtract
1
2
3
4
5val rddx = sc.parallelize(Seq(1, 2, 3, 4, 5), 2)
val rddy = sc.parallelize(Seq(3, 4, 5, 6, 7), 2)
rddx.intersection(rddy).collect().foreach(println)
rddx.union(rddy).collect().foreach(println)
rddx.subtract(rddy).collect().foreach(println)groupByKey:每个分区重复的k:v可以出来,但是reduceByKey每个分区只能有一个key出来(可以减少io)
1
2
3// 7. 分组 groupByKey 本质是shuffle 生成key => 数组
sc.parallelize(Seq(("a", 1), ("a", 2), ("c", 3), ("c", 4)), 2)
.groupByKey().foreach(println)combineByKey: groupByKey和reduceByKey的底层
1
2
3
4
5
6
7
8// 8. combineByKey 算平均分
sc.parallelize(Seq(("tzq", 97.0), ("tzq", 98.0), ("tr", 88.0), ("tr", 92.0)), 2)
// 参数说明:1.将value初步转换(分区内) 2.在每个分区把上一步结果聚合 3. 在所有分区上把每个分区结果聚合 4.可选,分区函数 5.可选,是否在map端的Combine 6.序列化器
// 思路:将每个数据变成(分数,1) 然后聚合 (总分,几) 一个分区结果就出来了
// 然后将不同分区的均分聚合, 然后除 (均分,1)
// 写法说明:第一个函数作用于第一条数据后,接着将结果和第二条数据作为参数传入第二个函数。 前两个函数作用于每个分区,将每个分区的结果作为参数传递给第三个函数
.combineByKey((_,1),(c:(Double,Int),nextValue:Double)=>(c._1+nextValue,c._2+1),(c:(Double,Int),v:(Double,Int))=>(c._1+v._1,c._2+v._2))
.map(item=>(item._1,(item._2._1/item._2._2,1))).foreach(println)foldByKey:比起reduceByKey有一个初始值(会加到每个元组)
1
2
3// 9. foldByKey
sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 2)
.foldByKey(10)(_+_).collect().foreach(println)aggregateByKey: 先处理 后聚合
1
2
3
4// 10. aggregateByKey 打八折后的总价
// 参数说明:1. 初始值 2.seqop 作用于每个分区每条数据 传递初始值和每条数据的value 3. combOp 整体聚合生成最终结果
sc.parallelize(Seq(("a", 10.0), ("a", 20.0), ("c", 30.0), ("d", 40.0)), 2)
.aggregateByKey(0.8)((zeroValue,item)=>item*zeroValue,(curr,agg)=>curr+agg).foreach(println)join
1
2
3val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)), 2)
val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 2), ("b", 12)), 2)
rdd1.join(rdd2).foreach(println)sortBy:作用于任何数据类型,sortByKey只用于kv 且只能按照key排序,写法简单
1
2
3
4
5
6
7
8// 12. sortBy
val rdd1 = sc.parallelize(Seq(2, 1, 6, 4, 5, 3, 7, 8, 9, 10, 11, 12), 2)
val rdd2 = sc.parallelize(Seq(("a", 2), ("b", 1), ("c", 3), ("d", 4)), 2)
// 参数:1.用哪个进行排序
rdd1.sortBy(item=>item).collect().foreach(println)
rdd2.sortBy(item=>item._2).collect().foreach(println)
rdd2.sortByKey().collect().foreach(printlnrepartition
1
2
3
4
5
6
7
8// 13. repartition
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2)
// 重新分区 分区越多线程越多 为了节省资源 可以适当减少分区数量
println(rdd.repartition(4).partitions.size)
// 减少合并分区
println(rdd.coalesce(5,shuffle = true).partitions.size)
// repatition 重分区时 默认shuffle
// coalesce 重分区时 默认不shuffle 所以默认不增大分区
action 操作:一个actions生成一个job
collect
reduce 不是转换操作的reduceByKey,如果有10个不同key的多条数据,结果只有10条,但是reduce后只有1条,reduce针对所有数据类型
1
2
3
4
5
6// 14. reduce
println(sc.parallelize(Seq(1, 2, 3, 4, 5), 2)
.reduce(_ + _))
val res = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)), 2)
.reduce((curr, agg) => ("全部", curr._2 + agg._2))
println(res._2)foreach 不同于scala本身的foreach,spark的算子会推送到集群执行,collect会将数据拉倒driver端,所以排序后不collect直接调用foreach会并行遍历各自分区的数据
count
1
2
3val rdd = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)), 2)
println(rdd.count())
println(rdd.countByKey())first() 返回第一个 take(N) 返回前N个,takeSample(withReplacement,fract)乐死sample,区别在于这是个action,直接返回结果到driver
2.3 Spark的一些注意点
每个计算任务必须可以拆分并行
计算会对应到每个文件块
提高容错两种手段:保存数据集和状态到介质里 or 根据rdd依赖推算
2.4 弹性分布式数据集
RDD特性:
- 惰性求值,只有collect,reduce才会开始计算
- 分区
- RDD是只读的
- RDD容错高,保存RDD之间的依赖,当RDD2计算错误,从RDD1计算回来,缓存
弹性分布式数据集:
- RDD可以运行在集群中,
- 高容错,RDD数据可以缓存
- RDD可以不保存具体数据,只保留必备信息(依赖和计算函数)
2.5 shffle
Maper1————->reducer1
|—————
|
Maper2————->reducer2
Maper3
Mapper1 –> reducer1 ,Mapper1 –>reducer2,Mapper2 –> reducer1 ………
shuffle 分为mapper端和reduce端,mapper将数据放入partition的函数计算,求得分到哪个reducer
[例子](https://www.jianshu.com/p/7f8d4484bfbd)
2.6 RDD支持的数据类型
String,数字,KV,对象
kv:类型 省略
数字类型支持(都是action):
- count
- mean 均值
- max min sum
- variance 方差
- sampleVariance 采样中计算方差
- stdev 标准差
- sampleStdev 采样中计算标准差
- …………很多
2.0 spark core
主要内容就是RDD
3.0 案例(统计北京天气)
- 读取文件
- 抽取需要的列
- 按照日,时为基础,运行reduce 统计东西地区pm
- 排序,获取结果
1 | package com.tr.spark |
4.0 RDD 特性
4.1 RDD分区和shuffle
分区作用:
RDD经常需要读取外部系统文件创建(那么外部存储系统往往是支持分片的,Rdd需要分区来和外部系统的文件分片一一对应)
Rdd的分区是一个并行计算的实现手段
shuffle特点:
只有 kV类型有shuffle
查看RDD分区
- 进入控制台
spark-shell --master local[6]
- 执行一个rdd
sc.parallelize(Seq(1,2,3,4,5,6,7,8,9))
- 进入webUI查看
http://localhost:4040
怎么创建分区:
- 读取外部文件时指定分区数量
sc.parallize(Seq(1,2,3),2)
- 通过本地集合创建时指定分区数量
sc.textFile("/data/x.txt",2)
怎么重分区:
coalesce(N,false):可以将分区缩小,如果需要扩大分区,指定shuffle:true
repartitions(N): 相当于coalesce的默认shuffle为true
通过其他算子指定分区:一般通过shuffle的算子都可以手动指定分区数,如果没有指定,默认从父节点继承
shuffle过程简介:
rdd2 = rdd1.reduceByKey()
实质是rdd2的调用函数,rdd2调用这个函数从rdd1拉取数据
,那么如何确定数据流入哪个分区,通过Partitioner函数:HashPartitioner,rdd2的分区和rdd1的分区是交错联系的,rdd2的每个分区去rdd1的每个分区内拉取数据
4.2 RDD缓存
4.3 RDD的checkpoint
什么是checkpoint? 斩断RDD的依赖链
方式: 本地存储,可靠的:缓存在hdfs上
Rdd之间有很多依赖关系,依赖链过长的话当某个rdd错误,需要追溯很久,斩断依赖链,就是不用计算之前的依赖链。
但是如果rdd错误,且之前的rdd已经斩断,正常情况下,可以重放,从上一个被斩断的节点开始(这个节点的结果已经被存放在外部可靠介质中,直接取出结果)
checkpoint本质还是缓存,但是和cache的区别是:
- checkpoint 数据可以保存在hdfs这类可靠介质内,cache和persist只能放在内存和磁盘
- checkpoint可以斩断依赖链,但是cache和persist不可以
使用checkpoint
1 | // 1.读取文件 |