【发布时间】:2016-03-18 14:53:07
【问题描述】:
几个来源默认将 RDD 描述为 短暂(例如,this s/o answer)——这意味着它们不会留在内存中,除非我们对其调用 cache() 或 persist()。 p>
假设我们的程序涉及一个短暂的(未由用户显式缓存的)RDD,该RDD 用于导致RDD 实现的一些操作。 我的问题是:Spark discard 物化的临时 RDD 立即 - 或者 RDD 是否有可能保留在内存中以进行其他操作,即使我们从未要求它被缓存?
另外,如果临时 RDD 留在内存中,是否总是因为某些 LRU 策略尚未将其踢出 - 或者也可能是因为调度优化?
我试图用下面这样的代码来解决这个问题——在 4 核机器上使用带有 python 3.5 和 spark 1.6.0 的 Jupyter notebook 运行——但我希望知道的人回答当然。
import pyspark
sc = pyspark.SparkContext()
N = 1000000 # size of dataset
THRESHOLD = 100 # some constant
def f():
""" do not chache """
rdd = sc.parallelize(range(N))
for i in range(10):
print(rdd.filter(lambda x: x > i * THRESHOLD).count())
def g():
""" cache """
rdd = sc.parallelize(range(N)).cache()
for i in range(10):
print(rdd.filter(lambda x: x > i * THRESHOLD).count())
对于上面的两个函数, f() 不要求 rdd 保持 - 但 g() 在开始时会。当我对 foo() 和 boo() 这两个函数计时时,两者的性能非常相似,就好像 cache() 调用没有任何区别。 (其实用缓存的比较慢)。
%%timeit
f()
> 1 loops, best of 3: 2.19 s per loop
%%timeit
g()
> 1 loops, best of 3: 2.7 s per loop
实际上,即使修改 f() 以在 RDD 上调用 unpersist() 也不会改变任何事情。
def ff():
""" modified f() with explicit call to unpersist() """
rdd = sc.parallelize(range(N))
for i in range(10):
rdd.unpersist()
print(rdd.filter(lambda x: x > i * THRESHOLD).count())
%%timeit
ff()
> 1 loops, best of 3: 2.25 s per loop
unpersist() 的文档声明它“将 [s] RDD 标记为非持久的,并从内存和磁盘中删除 [s] 它的所有块”。 但是,真的是这样吗?还是 Spark 知道它会在以后使用 RDD 时忽略对 unpersist 的调用?
【问题讨论】:
标签: python caching apache-spark pyspark rdd