首先看一下 API 参考 docs
combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
所以它接受我在下面定义的三个函数
scala> val createCombiner = (v:Int) => List(v)
createCombiner: Int => List[Int] = <function1>
scala> val mergeValue = (a:List[Int], b:Int) => a.::(b)
mergeValue: (List[Int], Int) => List[Int] = <function2>
scala> val mergeCombiners = (a:List[Int],b:List[Int]) => a.++(b)
mergeCombiners: (List[Int], List[Int]) => List[Int] = <function2>
一旦你定义了这些,你就可以在你的 combineByKey 调用中使用它,如下所示
scala> val list = List((1,5),(1,8),(1,40),(2,9),(2,20),(2,6))
list: List[(Int, Int)] = List((1,5), (1,8), (1,40), (2,9), (2,20), (2,6))
scala> val temp = sc.parallelize(list)
temp: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:30
scala> temp.combineByKey(createCombiner,mergeValue, mergeCombiners).collect
res27: Array[(Int, List[Int])] = Array((1,List(8, 40, 5)), (2,List(20, 9, 6)))
请注意,我在 Spark Shell 中对此进行了尝试,因此您可以在执行的命令下方看到输出。它们将帮助您加深理解。