【问题标题】:In Apache Spark, can I incrementally cache an RDD partition?在 Apache Spark 中,我可以增量缓存 RDD 分区吗?
【发布时间】:2020-07-30 05:13:21
【问题描述】:

我的印象是RDD执行和缓存都是惰性的:也就是说,如果一个RDD被缓存了,并且只使用了一部分,那么缓存机制只会缓存那部分,而另一部分将被计算点播。

不幸的是,以下实验似乎表明并非如此:

      val acc = new LongAccumulator()
      TestSC.register(acc)

      val rdd = TestSC.parallelize(1 to 100, 16).map { v =>
        acc add 1
        v
      }

      rdd.persist()

      val sliced = rdd
        .mapPartitions { itr =>
          itr.slice(0, 2)
        }

      sliced.count()

      assert(acc.value == 32)

运行它会产生以下异常:

100 did not equal 32
ScalaTestFailureLocation: 
Expected :32
Actual   :100

结果是整个 RDD 被计算出来,而不是每个分区中的前 2 个项目。这在某些情况下效率非常低(例如,当您需要快速确定 RDD 是否为空时)。理想情况下,缓存管理器应该允许缓存缓冲区被增量写入和随机访问,这个功能是否存在?如果没有,我应该怎么做才能让它发生? (最好使用现有的内存和磁盘缓存机制)

非常感谢您的意见

更新 1 看来 Spark 已经有 2 个类:

  • ExternalAppendOnlyMap
  • ExternalAppendOnlyUnsafeRowArray

支持对许多值进行更精细的缓存。更好的是,他们不依赖 StorageLevel,而是自己决定使用哪个存储设备。然而令我惊讶的是,它们不是 RDD/Dataset 直接缓存的选项,而不是 co-group/join/streamOps 或累加器。

【问题讨论】:

  • 你有点要求 Spark 有千里眼。有点多。
  • 不是特别的。 scala Stream 已经是一个惰性缓存的内存分区,添加磁盘溢出和自动重试/故障转移,您将拥有所描述的行为
  • 但这不是 Stream。
  • 另外:我从不要求 Spark 是“千里眼”来确定分区的性质:它始终是一个迭代器,没有例外,将其缓存为好像它是一个黑盒不会事情效率更高
  • 是和不是。需要看大局。

标签: apache-spark rdd persistent-storage


【解决方案1】:

事后看来很有趣,这是我的看法:

  • 您不能增量缓存。所以你的问题的答案是否定的。

  • persist 是该 RDD 的所有分区的 RDD,用于多个 Action 或单个 Action,从同一公共 RDD 阶段开始进行多次处理。

  • 如果您使用persist,rdd 优化器不会看到如何优化它。你发出了那个调用、方法、api,所以它执行它。

  • 但是,如果您不使用 persist,惰性求值和 Stage 内的代码融合,似乎将切片基数和 acc 联系在一起。这很清楚。是否合乎逻辑,是的,因为在其他地方没有进一步的参考作为另一个行动的一部分。其他人可能会认为它很奇怪或错误。但这并不意味着 imo 增量持久性/缓存。

所以,恕我直言,我不会想出有趣的观察结果,也不相信它证明了关于部分缓存的任何事情。

【讨论】:

  • 持久性在那里准确地展示了差异。如果不存在,则 acc 的值将是预期的 32
  • 有趣,需要考虑一下。
猜你喜欢
  • 1970-01-01
  • 2016-08-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-20
  • 1970-01-01
  • 2020-01-27
  • 2017-04-05
  • 2016-11-29
相关资源
最近更新 更多