RDD简介

RDD是弹性分布式数据集,完全弹性的,如果数据丢失一部分还可以重建。有自动容错、位置感知调度和可伸缩性。

RDD共包含两种计算方式,一种是transformations转换,一种是actions操作,每种计算方式包含一些常用的方法。
注:Transformations转换是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations转换时只会记录需要这样的转换,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

创建RDD对象

1、 读取外部文件
2、 并行一个集合

1
2
3
4
5
6
7
8
9
10
11
// 构建conf对象 SparkConf对象
// 必须设置的两个参数:运行模式,app的名字
val conf = new SparkConf().setMaster("local").setAppName("My Spark")
// 构建sc对象 SparkContext对象
val sc = SparkContext.getOrCreate(conf)
// 1.读取外部文件
// textFile objectFile sequenceFile hadoopFile newAPIHadoopFile
val text:RDD[String] = sc.textFile("src/data/fist.txt")
// 2.通过并行一个集合
val num:RDD[Int] = sc.parallelize(List(1,2,5,8,4))

RDD编程操作

转换操作(transformations)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
object TestTransfer {
def main(args: Array[String]): Unit = {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 准备测试数据
val num: RDD[Int] = sc.parallelize(1 to 10)
val num1: RDD[Int] = sc.parallelize(List(1,5,3,2,5))
val num2: RDD[Int] = sc.parallelize(List(2,7,3,5,1,0))
val list: RDD[Int] = sc.parallelize(List(1,5,2,6,3,2))
// 转化为PairRDD,(key,value)的形式
val pairRdd: RDD[(Int, Int)] = list.map(x => (x,1))
val pairRdd2: RDD[(Int, Int)] = list.map(x => (x,0))
// 1. 普通的RDD
// 2. PairRDD(键值对操作)
// 我们在进行并行聚合、分组等操作时,常常需要利用键值对形式的RDD,称为Pair RDD
// PairRDD相比于普通的RDD多了一些方法
// 映射
// 1. map(func) RDD
// 作用于RDD的每一行,产出新RDD每一行为经过函数处理后的结果。
list.map(_*2).foreach(println)
// 2. flatMap(func) RDD
// 展平元素,将每个符合条件的元素都放在一个集合中
list.flatMap(Range(0,_)).foreach(println)
// 过滤
// filter(func) RDD
// 将每个元素根据指定函数条件过滤
list.filter(_ % 3 != 0).foreach(println)
// 联合
// union(other) RDD
num1.union(num2).foreach(x => print(x + " "))
// 交集
// intersection(other) RDD
num1.intersection(num2).foreach(x => print(x + " "))
// 集合相减
// subtract(other) RDD
num1.subtract(num2).foreach(x => print(x + " "))
// 去重复
// distinct() RDD
num.distinct().foreach(x => print(x + " "))
// 聚合操作
// 1. reduceByKey(func) PairRDD
pairRdd.reduceByKey(_+_).foreach(println)
// 2. combineByKey PairRDD
// createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
// mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
// mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
val sum = pairRdd.combineByKey(v => (v,1),(a:(Int,Int),b:Int) => (a._1+b,a._2+1),(c1:(Int,Int),c2:(Int,Int)) => (c1._1+c2._1,c1._2+c2._2))
val avg = sum.map{case(key,value)=>(key,value._1/value._2.toFloat)}
avg.foreach(println)
// 分组操作
// 1. groupByKey() PairRDD
pairRdd.groupByKey().foreach(println)
// 2. groupBy(func) RDD
pairRdd.groupBy(x => x._1).foreach(println) // 根据key分组,相当于groupByKey
pairRdd.groupBy(x => x._2).foreach(println) // 根据value分组
// 排序
// sortByKey(boolean) PairRDD
// 默认true 升序 false 降序
pairRdd.sortByKey().foreach(println)
// sortBy(func,boolean) 自定义排序 RDD
pairRdd.sortBy(x => x._2).foreach(println)
// 连接操作
// join(other) 等值连接 PairRDD
val jnum: RDD[(Int, (Int, Int))] = pairRdd.join(pairRdd2)
jnum.foreach(println)
// rightOuterJoin(other) 右外连接 PairRDD
val rnum: RDD[(Int, (Option[Int], Int))] = pairRdd.rightOuterJoin(pairRdd2)
rnum.foreach(println)
// leftOuterJoin(other) 左外连接 PairRDD
val lnum: RDD[(Int, (Int, Option[Int]))] = pairRdd.leftOuterJoin(pairRdd2)
lnum.foreach(println)
// cogroup 全连接 PairRDD
val cnum: RDD[(Int, (Iterable[Int], Iterable[Int]))] = pairRdd.cogroup(pairRdd2)
cnum.foreach(println)
// subtractByKey 根据key集合相减 PairRDD
val snum: RDD[(Int, Int)] = pairRdd.subtractByKey(pairRdd2)
snum.foreach(println)
sc.stop()
}
}

行动操作(actions)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
object TestActions {
def main(args: Array[String]): Unit = {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 创建测试数据
val num: RDD[Int] = sc.parallelize(1 to 10)
// 1. collect()
// 将rdd内容转化为本地集合,返回一个List集合
// retrieve rdd contents as a local collection
num.collect().foreach(println)
// 2. take(n)
// 返回前几个元素
// return fist n elements
num.take(3).foreach(println)
// 3. top(n)
// 返回最大的n个值
num.top(3).foreach(println)
// 4. takeOrdered(n)
// 返回最小的n个值
num.takeOrdered(3).foreach(println)
// 5. takeSample(withReplace,num,[seed])
// 随机采样(是否放回,采样数量,随机种子)
num.takeSample(false,5).foreach(println)
// 6. foreach(func)
// RDD的每个元素会被func处理。在分区节点上运行,不会将结果收集到Drive节点
num.foreach(println)
// 7. count()
// 返回RDD元素个数
// count number of elements
println(num.count())
// 8. countByKey()
// 统计PairRDD的key出现的次数
val pairRdd = num.map(x => (x,1))
println(pairRdd.countByKey())
// 9. countByValue()
// 统计RDD的value出现的次数
println(num.countByValue())
// 10. reduce(func)
// 合并操作
// merge elements with an associative function
println(num.reduce(_+_)) // 计算总和
// 11. fold(初始值)(func)
// 带初始值的合并操作,每个分区计算一次+driver
println(num.fold(10)(_+_))
// 12. saveAsTextFile
// 将RDD的元素输出到指定的外部存储介质中,如HDFS/AFS中
num.saveAsTextFile("src/sql_out/text/te_1")
sc.stop()
}
}

RDD持久化方法

Spark通过cache()方法可以将RDD持久化到内存中,一旦首次被触发,该RDD将会被保留在计算节点的内存中,之后再调用这个RDD就不会再重复计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object TestActions {
def main(args: Array[String]): Unit = {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 创建测试数据
val num: RDD[Int] = sc.parallelize(1 to 10)
// 持久化到内存
num.cache()
sc.stop()
}
}

共享变量

多个task想要共享某个变量,Spark为此提供了两个共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。

广播变量

Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,而不会为每个task都拷贝一份。当变量很大时,其最大的用处是优化性能,减少网络传输以及内存消耗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 广播变量
// 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。
// 广播变量可被用于有效地给每个节点一个大输入数据集的副本
// 注:为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改
@Test
def counter3()={
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf) // 小数据集
val list = List(1,3,5,2,4,6)
val rdd = sc.parallelize(List(1,3,2,3,4,5,6,1,3,4,5)).cache()
// 广播变量
val broadcast = sc.broadcast(list)
rdd.filter(num => {
// 使用到了匿名函数的非局部变量
// 从广播变量中获取数据 就近原则 本地的worker
val list = broadcast.value
list.contains(num)
}).foreach(println)
}

累加变量

Accumulator可以让多个task共同操作一份变量,主要可以进行累加操作。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// 累加器
// 累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。
// 累加器的一个常见用途是在调式时对作业执行过程中的事件进行计数。
// 四种形式
// a. sc.longAccumulator("累加器名字");
// b. sc.doubleAccumulator("累加器名字");
// c. sc.collectionAccumulator[T]("");
// d. 自定义累加器 extends AccumulatorV2[T,T]
@Test
def counter1()={
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 累加器
var countA = sc.longAccumulator("奇数个数")
val list = List(1,3,5,2,4,6)
// 计算rdd中所有的偶数和以及奇数个数
val rdd = sc.parallelize(list)
// 一次性计算两个值
val result = rdd.filter(
x => {
// 注:累加器最好写在行动操作中
// 如果写在转化操作中,后面每次行动操作都会调用之前转化操作中的累加器,造成数值错误
// 解决方案:1 在第一次行动操作前先cache 2 避免写在转化操作中
if (x%2 != 0) countA.add(1L) // 累加器
x%2 == 0
}
).reduce(_+_)
println(result)
println(countA.value) // 通过累加器的value属性获取到累加值为3
}
// 对于行动操作中的累加器,Spark只会把每个任务对各类假期的修改应用一次,无论失败还是重复计算时都绝对可靠。
// 对于转化操作,可能会发生不止一次更新。
// 解决方案:1 在第一次行动操作前先cache 2 避免写在转化操作中
@Test
def counter2()={
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 累加器
var countA = sc.longAccumulator("奇数个数")
val list = List(1,3,5,2,4,6)
// 计算rdd中所有的偶数和以及奇数个数
val rdd = sc.parallelize(list)
// 一次性计算两个值
val sum = rdd.filter(
x => {
if (x%2 != 0) countA.add(1L) // 累加器
x%2 == 0
}
)
// 没有行动操作
println(countA.value)
sum.cache().reduce(_+_) // 第一次调用行动操作之前先缓存cache
// sum.reduce(_+_)
// 有一个行动操作
println(countA.value)
sum.count()
// 有两个行动操作
println(countA.value)
}