【发布时间】:2016-12-11 09:39:00
【问题描述】:
我想要在RDD 性能中执行类似reduce 的操作,但不需要运算符是可交换的。即我希望后面的result 永远是"123456789"。
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> val result = rdd.someAction{ _+_ }
首先,我找到了fold。 RDD#fold 的文档说:
def fold(zeroValue: T)(op: (T, T) ⇒ T): T 聚合元素 每个分区,然后是所有分区的结果,使用 给定关联函数和中性“零值”
请注意,文档中不需要 commutative。但是,结果并不如预期:
scala> rdd.fold(""){ _+_ }
res10: String = 312456879
编辑我已经尝试过@dk14 提到的,但没有运气:
scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359
【问题讨论】:
-
您错过了文档的下一部分,该部分描述了您所看到的内容:“这与在 Scala 等函数式语言中为非分布式集合实现的折叠操作有些不同。这个折叠操作可以单独应用于分区,然后将这些结果折叠到最终结果中,而不是以某种定义的顺序依次对每个元素应用折叠。对于不可交换的函数,结果可能与应用到的折叠的结果不同非分布式集合。”
标签: scala apache-spark rdd reduce fold