【问题标题】:Understanding Spark's caching了解 Spark 的缓存
【发布时间】:2015-04-27 18:48:38
【问题描述】:

我试图了解 Spark 的缓存是如何工作的。

这是我的幼稚理解,如果我遗漏了什么,请告诉我:

val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

在上面,rdd1 只会从磁盘(例如 HDFS)加载一次。 (我假设保存 rdd2 时),然后在保存 rdd3 时从缓存(假设有足够的 RAM)中)

现在这是我的问题。假设我想缓存 rdd2 和 rdd3 因为它们都将在以后使用,但在创建它们之后我不需要 rdd1。

基本上有重复,不是吗?由于一旦计算了 rdd2 和 rdd3 ,我就不再需要 rdd1 了,我可能应该取消它,对吧?问题是什么时候?

这行得通吗? (选项 A)

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()

spark 是否将 unpersist 调用添加到 DAG?还是立即完成?如果它立即完成,那么当我从 rdd2 和 rdd3 读取时,基本上 rdd1 将不会被缓存,对吗?

我应该这样做(选项 B)吗?

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.cache()
rdd3.cache()

rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

rdd1.unpersist()

所以问题是这样的: 选项A是否足够好?即rdd1 是否仍然只加载文件一次? 还是我需要选择选项 B?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    似乎需要选项 B。原因与Spark如何执行persist/cache和unpersist有关。由于 RDD 转换仅构建 DAG 描述而不执行,因此在选项 A 中,当您调用 unpersist 时,您仍然只有作业描述,而不是正在运行的执行。

    这是相关的,因为 cachepersist 调用只是将 RDD 添加到 RDD 的 Map 中,这些 RDD 将自己标记为在作业执行期间持久化。但是,unpersist 直接告诉 blockManager 将 RDD 从存储中逐出,并删除持久 RDD 的 Map 中的引用。

    persist function

    unpersist function

    因此,您需要在 Spark 实际执行并使用块管理器存储 RDD 后调用 unpersist。

    RDD.persist 方法的 cmets 暗示了这一点: rdd.persist

    【讨论】:

    • 是的,看来你在上面。这有点不幸,我希望“缓存”能够被转换为 DAG 操作,而不仅仅是将 RDD ID 添加到地图中......在很多情况下,您想要在中间缓存一些东西,创建一个新的 RDD,然后丢弃旧的。也许有很好的理论理由说明为什么这不是一个好主意……无论如何,缓存的 LRU(我假设)排序意味着如果 rdd2 和 rdd3 需要该空间进行缓存,则旧的未使用的 rdd1 将被驱逐...
    • 所以我主要研究了persist/cache和unpersist在做什么,但是当你从另一个RDD派生RDD以及它如何优化时,仍然有空间考虑Spark在做什么。我不确定rdd1 是否需要缓存,rdd2rdd3 在缓存或 DAG 流水线化时可能会对其进行检查点。不过,这对我来说更像是一个灰色地带。
    • 通过调试器进行了更多调查和跟踪。 rdd2rdd3 将引用 rdd1 作为依赖项。 rdd1 将在执行第一个操作时将其数据加载到分区中。现在rdd2rdd3 都将它们的转换应用于分区中rdd1 已经加载的数据。我相信如果您在同一个确切的 RDD 上运行多个操作,缓存会提供价值,但在这种新分支 RDD 的情况下,我认为您不会遇到同样的问题,因为我相信 Spark 知道 rdd1 仍然是第一次保存后rdd3 的依赖关系。
    【解决方案2】:

    在选项A中,您在调用操作时没有显示(调用保存)

    val rdd1 = sc.textFile("some data")
    rdd.cache() //marks rdd as cached
    val rdd2 = rdd1.filter(...)
    val rdd3 = rdd1.map(...)
    rdd2.cache()
    rdd3.cache()
    rdd1.unpersist()
    rdd2.saveAsTextFile("...")
    rdd3.saveAsTextFile("...")
    

    如果顺序如上,选项 A 应该使用 rdd1 的缓存版本来计算 rdd2 和 rdd 3

    【讨论】:

    • 应该,我同意,但是会吗?我认为不会,因为当您调用 rdd2.saveAsTestFile 等时, rdd1 已被标记为未持久化。持久化/非持久化不在 DAG 上
    • 在您调用 saveAsFile 之前,没有任何事情发生>
    【解决方案3】:

    选项 B 是一种带有小幅调整的最佳方法。使用成本较低的行动方法。在您的代码中提到的方法中, saveAsTextFile 是一项昂贵的操作,请用 count 方法替换它。

    这里的想法是从 DAG 中删除大的 rdd1,如果它与进一步的计算无关(在创建 rdd2 和 rdd3 之后)

    从代码更新方法

    val rdd1 = sc.textFile("some data").cache()
    val rdd2 = rdd1.filter(...).cache() 
    val rdd3 = rdd1.map(...).cache()
    
    rdd2.count
    rdd3.count
    
    rdd1.unpersist()
    

    【讨论】:

    • 调用一个动作只是为了使转换真正发生不是有效的方法/做法吗? @Shivaprasad
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-01-14
    • 2011-03-12
    • 1970-01-01
    • 2020-08-03
    • 2020-03-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多