object TestTransfer {
def main(args: Array[String]): Unit = {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 准备测试数据
val num: RDD[Int] = sc.parallelize(1 to 10)
val num1: RDD[Int] = sc.parallelize(List(1,5,3,2,5))
val num2: RDD[Int] = sc.parallelize(List(2,7,3,5,1,0))
val list: RDD[Int] = sc.parallelize(List(1,5,2,6,3,2))
// 转化为PairRDD,(key,value)的形式
val pairRdd: RDD[(Int, Int)] = list.map(x => (x,1))
val pairRdd2: RDD[(Int, Int)] = list.map(x => (x,0))
// 1. 普通的RDD
// 2. PairRDD(键值对操作)
// 我们在进行并行聚合、分组等操作时,常常需要利用键值对形式的RDD,称为Pair RDD
// PairRDD相比于普通的RDD多了一些方法
// 映射
// 1. map(func) RDD
// 作用于RDD的每一行,产出新RDD每一行为经过函数处理后的结果。
list.map(_*2).foreach(println)
// 2. flatMap(func) RDD
// 展平元素,将每个符合条件的元素都放在一个集合中
list.flatMap(Range(0,_)).foreach(println)
// 过滤
// filter(func) RDD
// 将每个元素根据指定函数条件过滤
list.filter(_ % 3 != 0).foreach(println)
// 联合
// union(other) RDD
num1.union(num2).foreach(x => print(x + " "))
// 交集
// intersection(other) RDD
num1.intersection(num2).foreach(x => print(x + " "))
// 集合相减
// subtract(other) RDD
num1.subtract(num2).foreach(x => print(x + " "))
// 去重复
// distinct() RDD
num.distinct().foreach(x => print(x + " "))
// 聚合操作
// 1. reduceByKey(func) PairRDD
pairRdd.reduceByKey(_+_).foreach(println)
// 2. combineByKey PairRDD
// createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
// mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
// mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)
val sum = pairRdd.combineByKey(v => (v,1),(a:(Int,Int),b:Int) => (a._1+b,a._2+1),(c1:(Int,Int),c2:(Int,Int)) => (c1._1+c2._1,c1._2+c2._2))
val avg = sum.map{case(key,value)=>(key,value._1/value._2.toFloat)}
avg.foreach(println)
// 分组操作
// 1. groupByKey() PairRDD
pairRdd.groupByKey().foreach(println)
// 2. groupBy(func) RDD
pairRdd.groupBy(x => x._1).foreach(println) // 根据key分组,相当于groupByKey
pairRdd.groupBy(x => x._2).foreach(println) // 根据value分组
// 排序
// sortByKey(boolean) PairRDD
// 默认true 升序 false 降序
pairRdd.sortByKey().foreach(println)
// sortBy(func,boolean) 自定义排序 RDD
pairRdd.sortBy(x => x._2).foreach(println)
// 连接操作
// join(other) 等值连接 PairRDD
val jnum: RDD[(Int, (Int, Int))] = pairRdd.join(pairRdd2)
jnum.foreach(println)
// rightOuterJoin(other) 右外连接 PairRDD
val rnum: RDD[(Int, (Option[Int], Int))] = pairRdd.rightOuterJoin(pairRdd2)
rnum.foreach(println)
// leftOuterJoin(other) 左外连接 PairRDD
val lnum: RDD[(Int, (Int, Option[Int]))] = pairRdd.leftOuterJoin(pairRdd2)
lnum.foreach(println)
// cogroup 全连接 PairRDD
val cnum: RDD[(Int, (Iterable[Int], Iterable[Int]))] = pairRdd.cogroup(pairRdd2)
cnum.foreach(println)
// subtractByKey 根据key集合相减 PairRDD
val snum: RDD[(Int, Int)] = pairRdd.subtractByKey(pairRdd2)
snum.foreach(println)
sc.stop()
}
}