【发布时间】:2020-07-30 05:13:21
【问题描述】:
我的印象是RDD执行和缓存都是惰性的:也就是说,如果一个RDD被缓存了,并且只使用了一部分,那么缓存机制只会缓存那部分,而另一部分将被计算点播。
不幸的是,以下实验似乎表明并非如此:
val acc = new LongAccumulator()
TestSC.register(acc)
val rdd = TestSC.parallelize(1 to 100, 16).map { v =>
acc add 1
v
}
rdd.persist()
val sliced = rdd
.mapPartitions { itr =>
itr.slice(0, 2)
}
sliced.count()
assert(acc.value == 32)
运行它会产生以下异常:
100 did not equal 32
ScalaTestFailureLocation:
Expected :32
Actual :100
结果是整个 RDD 被计算出来,而不是每个分区中的前 2 个项目。这在某些情况下效率非常低(例如,当您需要快速确定 RDD 是否为空时)。理想情况下,缓存管理器应该允许缓存缓冲区被增量写入和随机访问,这个功能是否存在?如果没有,我应该怎么做才能让它发生? (最好使用现有的内存和磁盘缓存机制)
非常感谢您的意见
更新 1 看来 Spark 已经有 2 个类:
- ExternalAppendOnlyMap
- ExternalAppendOnlyUnsafeRowArray
支持对许多值进行更精细的缓存。更好的是,他们不依赖 StorageLevel,而是自己决定使用哪个存储设备。然而令我惊讶的是,它们不是 RDD/Dataset 直接缓存的选项,而不是 co-group/join/streamOps 或累加器。
【问题讨论】:
-
你有点要求 Spark 有千里眼。有点多。
-
不是特别的。 scala Stream 已经是一个惰性缓存的内存分区,添加磁盘溢出和自动重试/故障转移,您将拥有所描述的行为
-
但这不是 Stream。
-
另外:我从不要求 Spark 是“千里眼”来确定分区的性质:它始终是一个迭代器,没有例外,将其缓存为好像它是一个黑盒不会事情效率更高
-
是和不是。需要看大局。
标签: apache-spark rdd persistent-storage