思路分析:
1.此操作需要用到两个pairRDD,第一个pairRDD,key:当前页面,value:当前页面所含链接页面的合集。
2.第二个pairRDD,key:当前页面,value:当前页面的权重
3.两个pairRDD在循环中根据key进行连接操作,计算每次循环返回后各个页面的权重值
4.根据权重值的大小可知页面的排名

开发代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
object PageRankTest {
def main(args: Array[String]): Unit = {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
// 初始化sc对象
val conf = new SparkConf().setMaster("local[2]").setAppName("My Spark")
val sc = SparkContext.getOrCreate(conf)
// 获取程序开始时的时间
val start = System.currentTimeMillis()
// 建立模拟数据来测试
// key:当前页面
// value:链接的页面
val links = List(
("A",List("B","C")),
("B",List("C")),
("C",List("D")),
("D",List("A","C"))
)
// key:页面
// value:权重 初始默认值为1
val ranks = List(
("A",1.0),
("B",1.0),
("C",1.0),
("D",1.0)
)
// 创建页面的RDD 分区 持久化
val linksRDD = sc.parallelize(links).partitionBy(new HashPartitioner(4)).persist()
// 创建权重的RDD
var ranksRDD = sc.parallelize(ranks)
/**
* 迭代多次获取一个相对稳定的权重值
* 如果迭代次数过大,栈内存会溢出可以设置一下栈的最小内存
*
* 设置JVM虚拟内存的允许最小 最大 以及栈的最小内存
* -Xms1024m -Xmx2048m -Xss1024m
* */
// 设置迭代次数
val iterator = 50
(1 to iterator).foreach(x => {
/**
* 两个RDD在循环中进行连接操作,需要对其中一个RDD提前预定义分区。
* 由于ranksRDD中的数据需要发生变化,因此对linksRDD进行预定义分区
*
* 3134
* 并行度=4 2674 并行度=5 2568
* */
// 连接操作 join key --> String
// RDD[(页面,(链接数据集合,权重值))]
val joinRDD: RDD[(String, (List[String], Double))] = linksRDD.join(ranksRDD)
val newRDD: RDD[(String, Double)] = joinRDD.flatMap(x=>{
// 当前页面
val page = x._1
// (链接数据集合,权重值)
val (linkSet,rank) = x._2
// 当前page分给其他链接页面的平均值
val avg = rank / linkSet.size
// 链接的页面,分给的均值
linkSet.map(page1 => {
(page1,avg)
})
})
// 按照key,即页面相同,权重值聚合+ 聚合之后按照0.15+0.85*权重值重新修改值
ranksRDD = newRDD.reduceByKey(_+_).mapValues(rank => rank*0.85 + 0.15)
})
ranksRDD.foreach(println)
// 获取程序结束时的时间
val end = System.currentTimeMillis()
println("耗时:" + (end - start))
sc.stop()
}
}