【问题标题】:Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the orderingSpark:如何将 RDD[T]` 拆分为 Seq[RDD[T]] 并保留顺序
【发布时间】:2014-11-12 21:08:48
【问题描述】:

如何有效地将 RDD[T] 拆分为带有 n 元素的 Seq[RDD[T]] / Iterable[RDD[T]] 并保留原始顺序?

我希望能够写出这样的东西

RDD(1, 2, 3, 4, 5, 6, 7, 8, 9).split(3)

这应该会导致类似

Seq(RDD(1, 2, 3), RDD(4, 5, 6), RDD(7, 8, 9))

spark 有提供这样的功能吗?如果不是,那么实现这一目标的高效方法是什么?

val parts = rdd.length / n
val rdds = rdd.zipWithIndex().map{ case (t, i) => (i - (i % parts), t)}.groupByKey().values.map(iter => sc.parallelize(iter.toSeq)).collect

看起来不是很快..

【问题讨论】:

  • 这没有任何意义 - RDD 是对围绕集群分裂的事物的引用。您想要一堆不同的集群节点,每个节点都有......对跨所有集群节点拆分的东西的引用?可能rdd.mapPartitions(_.grouped(3)) 会做你想做的事,但我建议你退后一步,在更高的层次上询问你的问题——你到底想在这里实现什么?
  • 是的,你是对的。我需要像 Seq[RDD[T]] 或 Iterable[RDD[T]] 这样的东西。我将编辑问题...
  • 仍然没有多大意义 - 最好将 RDD 放在顶层,以便分区尽可能粗糙。
  • 我知道你的意思。但是最初的 RDD 来自一个非常大的文件,对于我的用例,我必须在一个循环中一个接一个地处理这个文件的大批量。因为可以以分布式方式处理这些大批量中的每一个,它们必须是 RDD 类型。这有意义吗?
  • 您想处理所有这些,不是吗?因此,让每个集群节点执行完整的批次比单独分配每个批次更有效(我的意思是,我假设您有比集群节点更多的批次)。 (当然,3 个元素的 RDD 是愚蠢的 - 集群开销将远大于收益 - 尽管我相信这只是一个示例)

标签: scala apache-spark spark-streaming rdd


【解决方案1】:

从技术上讲,您可以按照您的建议进行操作。但是,在利用计算集群来执行大数据的分布式处理的情况下,它确实没有意义。它首先违背了 Spark 的全部观点。如果您执行 groupByKey 然后尝试将它们提取到单独的 RDD 中,您实际上是将 RDD 中分布的所有数据拉到驱动程序上,然后将每个数据重新分配回集群。如果驱动程序不能加载整个数据文件,它也将无法执行此操作。

您不应将大型数据文件从本地文件系统加载到驱动程序节点。您应该将文件移动到 HDFS 或 S3 等分布式文件系统上。然后,您可以通过 val lines = SparkContext.textFile(...) 将单个大数据文件加载到集群中,并将其加载到行的 RDD 中。当您这样做时,集群中的每个工作人员将只加载文件的一部分,这是可以做到的,因为数据已经在分布式文件系统中分布在集群中。

如果您随后需要将数据组织成对数据的功能处理很重要的“批次”,您可以使用适当的批次标识符来键入数据,例如:val batches = lines.keyBy( line => lineBatchID(line) )

然后可以将每个批次归结为批次级别的汇总,这些汇总可以归结为单个整体结果。

为了测试 Spark 代码,可以将数据文件的 small 样本加载到单台机器上。但是当涉及到完整的数据集时,您应该利用分布式文件系统和 Spark 集群来处理这些数据。

【讨论】:

    猜你喜欢
    • 2015-02-27
    • 2011-12-26
    • 1970-01-01
    • 2015-11-20
    • 1970-01-01
    • 1970-01-01
    • 2019-09-19
    • 2014-09-22
    • 1970-01-01
    相关资源
    最近更新 更多