【问题标题】:Spark RDD's - how do they workSpark RDD——它们是如何工作的
【发布时间】:2015-02-11 14:37:21
【问题描述】:

我有一个在单节点上运行良好的小型 Scala 程序。但是,我正在扩展它,以便它在多个节点上运行。这是我的第一次这样的尝试。我只是想了解 RDD 在 Spark 中是如何工作的,所以这个问题是基于理论的,可能不是 100% 正确的。

假设我创建了一个 RDD: val rdd = sc.textFile(file)

现在,一旦我这样做了,这是否意味着 file 处的文件现在已跨节点分区(假设所有节点都可以访问文件路径)?

其次,我想计算 RDD 中的对象数量(足够简单),但是,我需要在需要应用于 RDD 中的对象的计算中使用该数字 - 伪代码示例:

rdd.map(x => x / rdd.size)

假设rdd 中有 100 个对象,假设有 10 个节点,因此每个节点计数 10 个对象(假设这是 RDD 概念的工作方式),现在当我调用该方法时,每个节点都在运行使用rdd.size 作为10100 执行计算?因为总体而言,RDD 的大小为100,但在每个节点本地它只有10。在进行计算之前,我是否需要制作广播变量?此问题与以下问题相关。

最后,如果我对 RDD 进行转换,例如rdd.map(_.split("-")),然后我想要RDD的新的size,是否需要对RDD执行一个动作,比如count(),这样所有信息都发回驱动节点?

【问题讨论】:

  • '这个问题与下面的问题相关联。' --> ??
  • 我想你的意思是rdd.flatMap(_.split("-"))

标签: scala apache-spark bigdata distributed-computing rdd


【解决方案1】:
val rdd = sc.textFile(file)

这是否意味着文件现在已跨节点分区?

文件保留在原来的位置。生成的RDD[String] 的元素是文件的行。 RDD 被分区以匹配底层文件系统的自然分区。分区数不取决于您拥有的节点数。

重要的是要了解,当执行此行时,它不会读取文件。 RDD 是一个惰性对象,只会在必须的时候做一些事情。这很棒,因为它避免了不必要的内存使用。

例如,如果您写val errors = rdd.filter(line => line.startsWith("error")),仍然没有任何反应。如果你然后写val errorCount = errors.count 现在你的操作序列将需要执行,因为count 的结果是一个整数。然后,每个工作核心(执行程序线程)将并行执行的操作是读取一个文件(或一段文件),遍历其行,并计算以“错误”开头的行。除了缓冲和 GC,每个内核一次只有一行在内存中。这使得在不使用大量内存的情况下处理非常大的数据成为可能。

我想计算 RDD 中的对象数量,但是,我需要在需要应用于 RDD 中的对象的计算中使用该数字 - 伪代码示例:

rdd.map(x => x / rdd.size)

没有rdd.size 方法。有rdd.count,统计RDD中的元素个数。 rdd.map(x => x / rdd.count) 不起作用。该代码将尝试将rdd 变量发送给所有工作人员,并且将失败并返回NotSerializableException。你可以做的是:

val count = rdd.count
val normalized = rdd.map(x => x / count)

这可行,因为countInt 并且可以序列化。

如果我对 RDD 进行转换,例如rdd.map(_.split("-")),然后我想要新的RDD大小,是否需要对RDD执行一个动作,比如count(),这样所有信息都发回驱动节点?

map 不会改变元素的数量。我不知道你说的“大小”是什么意思。但是,是的,您需要执行一个操作,例如 count 以从 RDD 中获取任何内容。您会看到,在您执行某个操作之前,根本不会执行任何工作。 (当你执行count时,只会将每个分区的计数发送回驱动程序,当然不是“所有信息”。)

【讨论】:

  • 我根据您在the documentation 中的回答做了一个python 示例,如果您喜欢,可以将它包含在您的回答中!
  • 这应该是公认的答案。它完整​​而正确地回答了所有部分。
【解决方案2】:

通常,文件(或文件的一部分,如果它太大的话)会被复制到集群中的 N 个节点(在 HDFS 上默认 N=3)。并不是要在所有可用节点之间拆分每个文件。

但是,对于您(即客户端)来说,使用 Spark 处理文件应该是透明的 - 无论拆分和/或复制多少个节点,您都不应该看到 rdd.size 的任何差异。有一些方法(至少在 Hadoop 中)可以找出文件目前可以位于哪些节点(部分)上。但是,在简单的情况下,您很可能不需要使用此功能。

更新:一篇描述 RDD 内部的文章:https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

【讨论】:

  • 感谢您的回复。因此,对于像这样的计算:rdd.filter(...).map(x => x * rdd.count) 是在每个节点执行filter 步骤之前,任何节点都可以执行map 步骤吗?因为很明显,map 步骤依赖于已经在每个节点上执行的filter 步骤,因为map 包含rdd.count。再次感谢。
  • 当然,因为map 是建立在filter 之上的(阅读文章中的“血统”概念)。
  • 感谢您的信息,这是一本好书,但是,我现在想知道广播变量的用途是什么?再次感谢,不胜感激!
  • 伯克利链接现已失效。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-09-08
  • 2017-02-11
  • 2011-08-30
  • 2017-05-05
  • 1970-01-01
相关资源
最近更新 更多