【问题标题】:Unexpected spark caching behavior意外的火花缓存行为
【发布时间】:2018-04-14 01:25:56
【问题描述】:

我有一个 spark 程序,基本上可以做到这一点:

def foo(a: RDD[...], b: RDD[...]) = {
  val c = a.map(...)
  c.persist(StorageLevel.MEMORY_ONLY_SER)
  var current = b
  for (_ <- 1 to 10) {
    val next = some_other_rdd_ops(c, current)
    next.persist(StorageLevel.MEMORY_ONLY)
    current.unpersist()
    current = next
  }
  current.saveAsTextFile(...)
}

我看到的奇怪行为是对应于val c = a.map(...) 的火花阶段发生了 10 次。由于下一行会立即缓存,我本来希望这种情况只会发生一次,但事实并非如此。当我查看正在运行的作业的“存储”选项卡时,很少有 c 的分区被缓存。

此外,该阶段的 10 个副本立即显示为“活动”。 val next = some_other_rdd_ops(c, current)对应的stage的10个副本显示为pending,它们大致交替执行。

我是否误解了如何让 Spark 缓存 RDD?

编辑:这是一个包含重现此程序的要点:https://gist.github.com/jfkelley/f407c7750a086cdb059c。它期望输入图的边列表(带有边权重)。例如:

a   b   1000.0
a   c   1000.0
b   c   1000.0
d   e   1000.0
d   f   1000.0
e   f   1000.0
g   h   1000.0
h   i   1000.0
g   i   1000.0
d   g   400.0

要点的第 31-42 行对应于上面的简化版本。当我只期望 1 个阶段时,我得到了对应于第 31 行的 10 个阶段。

【问题讨论】:

  • 我认为您的期望是正确的。也许代码有问题?你能提供一个我们可以重现问题的例子吗?一种可能的解释是,当您继续将内容放入缓存时,它会推出c。不过我不确定是不是这样。
  • Daniel 关于缓存被驱逐的猜测是正确的。此外,some_other_rdd_ops 对我们来说是一个黑匣子……所以它可能会做一些意想不到的事情。
  • 我会更多地研究您的current.unpersist() 声明。你确定 c 永远不会成为当前的吗?
  • @marios,是的,我确定。 c 和 current 无论如何都有不同的类型。 @JustinPihony,some_other_rdd_ops 是:c.join(current.map(...)).aggregateByKey(...).mapValues(...)。没有persist/unpersist、collect、saveToTextFile等。
  • @DanielDarabos 当然,我添加了一个完全可执行的示例来重现这一点。抱歉,这有点复杂;这就是我最初发布简化版本的原因。

标签: scala apache-spark rdd


【解决方案1】:

这里的问题是调用cache 是懒惰的。在触发操作并评估 RDD 之前,不会缓存任何内容。该调用所做的只是在 RDD 中设置一个标志,以指示它在评估时应该被缓存。

然而,Unpersist 会立即生效。它清除指示应该缓存 RDD 的标志,并开始从缓存中清除数据。由于您在应用程序结束时只有一个操作,这意味着在评估任何 RDD 时,Spark 不会看到它们中的任何一个应该被持久化!

我同意这是令人惊讶的行为。一些 Spark 库(包括 GraphX 中的 PageRank 实现)解决此问题的方法是在对 cacheunpersist 的调用之间显式实现每个 RDD。例如,在您的情况下,您可以执行以下操作:

def foo(a: RDD[...], b: RDD[...]) = {
  val c = a.map(...)
  c.persist(StorageLevel.MEMORY_ONLY_SER)
  var current = b
  for (_ <- 1 to 10) {
    val next = some_other_rdd_ops(c, current)
    next.persist(StorageLevel.MEMORY_ONLY)
    next.foreachPartition(x => {}) // materialize before unpersisting
    current.unpersist()
    current = next
  }
  current.saveAsTextFile(...)
}

【讨论】:

    【解决方案2】:

    缓存不会减少阶段,它只是不会每次都重新计算阶段。

    在第一次迭代中,在阶段的“输入大小”中,您可以看到数据来自 Hadoop,并且它读取 shuffle 输入。在随后的迭代中,数据来自内存,不再有随机输入。此外,执行时间也大大减少。

    每当必须写入 shuffle 时,都会创建新的 map 阶段,例如,当分区发生变化时,在您的情况下向 RDD 添加一个键。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-01-07
      • 1970-01-01
      • 2021-05-15
      • 1970-01-01
      • 2023-02-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多