【问题标题】:Is there any action in RDD keeps the order?RDD中是否有任何动作保持顺序?
【发布时间】:2016-12-11 09:39:00
【问题描述】:

我想要在RDD 性能中执行类似reduce 的操作,但不需要运算符是可交换的。即我希望后面的result 永远是"123456789"

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val result = rdd.someAction{ _+_ }

首先,我找到了foldRDD#fold 的文档说:

def fold(zeroValue: T)(op: (T, T) ⇒ T): T 聚合元素 每个分区,然后是所有分区的结果,使用 给定关联函数和中性“零值”

请注意,文档中不需要 commutative。但是,结果并不如预期:

scala> rdd.fold(""){ _+_ }
res10: String = 312456879

编辑我已经尝试过@dk14 提到的,但没有运气:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359

【问题讨论】:

  • 您错过了文档的下一部分,该部分描述了您所看到的内容:“这与在 Scala 等函数式语言中为非分布式集合实现的折叠操作有些不同。这个折叠操作可以单独应用于分区,然后将这些结果折叠到最终结果中,而不是以某种定义的顺序依次对每个元素应用折叠。对于不可交换的函数,结果可能与应用到的折叠的结果不同非分布式集合。”

标签: scala apache-spark rdd reduce fold


【解决方案1】:

Scala 中没有满足此标准的内置归约操作,但您可以通过结合 mapPartitionscollect 和局部归约轻松实现自己的操作:

import scala.reflect.ClassTag

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
  rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}

使用collectreduce 的组合进行合并而不是fold 使用的异步和无序方法可确保保留全局顺序。

这当然会带来一些额外的费用,包括:

  • 驱动程序的内存占用略高。
  • 延迟显着增加 - 在开始本地缩减之前,我们明确等待所有任务完成。

【讨论】:

  • 感谢您的帮助,这是否意味着每个分区始终是整个RDD的连续子序列?有没有提到的文件?
  • 关于文档 - 我不知道。但是,它或多或少受到某些有序方法的模型和合同的约束。 Spark 中真正的问题是如何确定整体顺序。一般来说,当您推理顺序时有两种情况 a) 当您使用显式排序(按合同)时 b) 当您有输入生成确定性有序拆分并且在输入和当前点之间没有洗牌和其他数据移动时。
【解决方案2】:

正如@YuvalItzchakov 所指出的,fold 在组合结果时不会保留分区RDD 中的排序。为了说明这一点,请考虑将原始 RDD 合并到一个分区中,

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1)
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res4: String = 123456789

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res5: String = 123456789

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res6: String = 123456789

【讨论】:

  • 需要注意的是,这样做会导致完全失去计算的并行能力。
  • @YuvalItzchakov 确定;使用fold,排序可能不会保留在分区RDD中。
  • 是的,我明白了。但是 OP 应该意识到这一点。
猜你喜欢
  • 2020-05-27
  • 1970-01-01
  • 2018-12-28
  • 1970-01-01
  • 2015-05-30
  • 1970-01-01
  • 1970-01-01
  • 2020-09-29
  • 2015-07-08
相关资源
最近更新 更多