【发布时间】:2014-11-12 21:08:48
【问题描述】:
如何有效地将 RDD[T] 拆分为带有 n 元素的 Seq[RDD[T]] / Iterable[RDD[T]] 并保留原始顺序?
我希望能够写出这样的东西
RDD(1, 2, 3, 4, 5, 6, 7, 8, 9).split(3)
这应该会导致类似
Seq(RDD(1, 2, 3), RDD(4, 5, 6), RDD(7, 8, 9))
spark 有提供这样的功能吗?如果不是,那么实现这一目标的高效方法是什么?
val parts = rdd.length / n
val rdds = rdd.zipWithIndex().map{ case (t, i) => (i - (i % parts), t)}.groupByKey().values.map(iter => sc.parallelize(iter.toSeq)).collect
看起来不是很快..
【问题讨论】:
-
这没有任何意义 - RDD 是对围绕集群分裂的事物的引用。您想要一堆不同的集群节点,每个节点都有......对跨所有集群节点拆分的东西的引用?可能
rdd.mapPartitions(_.grouped(3))会做你想做的事,但我建议你退后一步,在更高的层次上询问你的问题——你到底想在这里实现什么? -
是的,你是对的。我需要像 Seq[RDD[T]] 或 Iterable[RDD[T]] 这样的东西。我将编辑问题...
-
仍然没有多大意义 - 最好将 RDD 放在顶层,以便分区尽可能粗糙。
-
我知道你的意思。但是最初的 RDD 来自一个非常大的文件,对于我的用例,我必须在一个循环中一个接一个地处理这个文件的大批量。因为可以以分布式方式处理这些大批量中的每一个,它们必须是 RDD 类型。这有意义吗?
-
您想处理所有这些,不是吗?因此,让每个集群节点执行完整的批次比单独分配每个批次更有效(我的意思是,我假设您有比集群节点更多的批次)。 (当然,3 个元素的 RDD 是愚蠢的 - 集群开销将远大于收益 - 尽管我相信这只是一个示例)
标签: scala apache-spark spark-streaming rdd