【发布时间】:2016-05-07 04:01:40
【问题描述】:
我的 RDD 为 (String,String,Int)。
- 我想根据前两个字符串减少它
- 然后根据第一个字符串,我想对 (String,Int) 进行分组并对它们进行排序
- 排序后,我需要将它们分成小组,每组包含 n 个元素。
我已经完成了下面的代码。问题是步骤 2 中的元素数量对于单个键来说非常大
reduceByKey(x++y) 需要很多时间。
//Input
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
问题是“c1”有很多独特的条目,如 b1 ,b2 ....million 和 reduceByKey 正在消磨时间,因为所有值都将发送到单个节点。
有没有办法更有效地实现这一目标?
// output
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))
【问题讨论】:
-
您可以尝试在第一个reduceByKey之后重新分区数据,然后使用combineByKey代替map-mapValues-reduceByKey,我认为这有助于平衡工作量
标签: scala apache-spark rdd reduce