这里是 combineByKey 的一个例子。目标是找到输入数据的每个键的平均值。
scala> val kvData = Array(("a",1),("b",2),("a",3),("c",9),("b",6))
kvData: Array[(String, Int)] = Array((a,1), (b,2), (a,3), (c,9), (b,6))
scala> val kvDataDist = sc.parallelize(kvData,5)
kvDataDist: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val keyAverages = kvDataDist.combineByKey(x=>(x,1),(a: (Int,Int),x: Int)=>(a._1+x,a._2+1),(b: (Int,Int),c: (Int,Int))=>(b._1+c._1,b._2+c._2))
keyAverages: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at combineByKey at <console>:25
scala> keyAverages.collect
res0: Array[(String, (Int, Int))] = Array((c,(9,1)), (a,(4,2)), (b,(8,2)))
scala> val keyAveragesFinal = keyAverages.map(x => (x._1,x._2._1/x._2._2))
keyAveragesFinal: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25
scala> keyAveragesFinal.collect
res1: Array[(String, Int)] = Array((c,9), (a,2), (b,4))
combineByKey 接受 3 个函数作为参数:
-
函数 1 = createCombiner:在每个分区中,每个键“k”调用一次
- 输入:与键“k”关联的值
- 输出:基于程序逻辑的任何所需输出类型“O”。此输出类型将自动进一步使用。
在此示例中,选择的输出类型是 (Int,Int)。
Pair 中的第一个元素对值求和,第二个元素跟踪构成总和的元素的数量。
-
Function 2 = mergeValue : 与分区中键“k”的出现次数一样多次 - 1
- 输入:(createCombiner 的输出:O,与此分区中的键“k”关联的后续值)
- 输出:(输出:O)
-
函数 3 = mergeCombiners :调用次数与键所在的分区一样多
- 输入:(分区 X 的 mergeValue 输出:O,分区 Y 的 mergeValue 输出:O)
- 输出:(输出:O)