【发布时间】:2018-04-14 01:25:56
【问题描述】:
我有一个 spark 程序,基本上可以做到这一点:
def foo(a: RDD[...], b: RDD[...]) = {
val c = a.map(...)
c.persist(StorageLevel.MEMORY_ONLY_SER)
var current = b
for (_ <- 1 to 10) {
val next = some_other_rdd_ops(c, current)
next.persist(StorageLevel.MEMORY_ONLY)
current.unpersist()
current = next
}
current.saveAsTextFile(...)
}
我看到的奇怪行为是对应于val c = a.map(...) 的火花阶段发生了 10 次。由于下一行会立即缓存,我本来希望这种情况只会发生一次,但事实并非如此。当我查看正在运行的作业的“存储”选项卡时,很少有 c 的分区被缓存。
此外,该阶段的 10 个副本立即显示为“活动”。 val next = some_other_rdd_ops(c, current)对应的stage的10个副本显示为pending,它们大致交替执行。
我是否误解了如何让 Spark 缓存 RDD?
编辑:这是一个包含重现此程序的要点:https://gist.github.com/jfkelley/f407c7750a086cdb059c。它期望输入图的边列表(带有边权重)。例如:
a b 1000.0
a c 1000.0
b c 1000.0
d e 1000.0
d f 1000.0
e f 1000.0
g h 1000.0
h i 1000.0
g i 1000.0
d g 400.0
要点的第 31-42 行对应于上面的简化版本。当我只期望 1 个阶段时,我得到了对应于第 31 行的 10 个阶段。
【问题讨论】:
-
我认为您的期望是正确的。也许代码有问题?你能提供一个我们可以重现问题的例子吗?一种可能的解释是,当您继续将内容放入缓存时,它会推出
c。不过我不确定是不是这样。 -
Daniel 关于缓存被驱逐的猜测是正确的。此外,some_other_rdd_ops 对我们来说是一个黑匣子……所以它可能会做一些意想不到的事情。
-
我会更多地研究您的
current.unpersist()声明。你确定 c 永远不会成为当前的吗? -
@marios,是的,我确定。 c 和 current 无论如何都有不同的类型。 @JustinPihony,some_other_rdd_ops 是:
c.join(current.map(...)).aggregateByKey(...).mapValues(...)。没有persist/unpersist、collect、saveToTextFile等。 -
@DanielDarabos 当然,我添加了一个完全可执行的示例来重现这一点。抱歉,这有点复杂;这就是我最初发布简化版本的原因。
标签: scala apache-spark rdd