【问题标题】:Does Spark discard ephemeral rdds immediately?Spark 会立即丢弃临时 rdds 吗?
【发布时间】:2016-03-18 14:53:07
【问题描述】:

几个来源默认将 RDD 描述为 短暂(例如,this s/o answer)——这意味着它们不会留在内存中,除非我们对其调用 cache() 或 persist()。 p>

假设我们的程序涉及一个短暂的(未由用户显式缓存的)RDD,该RDD 用于导致RDD 实现的一些操作。 我的问题是:Spark discard 物化的临时 RDD 立即 - 或者 RDD 是否有可能保留在内存中以进行其他操作,即使我们从未要求它被缓存?

另外,如果临时 RDD 留在内存中,是否总是因为某些 LRU 策略尚未将其踢出 - 或者也可能是因为调度优化?

我试图用下面这样的代码来解决这个问题——在 4 核机器上使用带有 python 3.5 和 spark 1.6.0 的 Jupyter notebook 运行——但我希望知道的人回答当然。

import pyspark
sc = pyspark.SparkContext()
N = 1000000   # size of dataset
THRESHOLD = 100  # some constant

def f():
    """ do not chache """
    rdd = sc.parallelize(range(N))
    for i in range(10):
        print(rdd.filter(lambda x: x > i * THRESHOLD).count())

def g():
    """ cache """
    rdd = sc.parallelize(range(N)).cache()
    for i in range(10):
        print(rdd.filter(lambda x: x > i * THRESHOLD).count())

对于上面的两个函数, f() 不要求 rdd 保持 - 但 g() 在开始时会。当我对 foo() 和 boo() 这两个函数计时时,两者的性能非常相似,就好像 cache() 调用没有任何区别。 (其实用缓存的比较慢)。

%%timeit
f()
> 1 loops, best of 3: 2.19 s per loop

%%timeit
g()
> 1 loops, best of 3: 2.7 s per loop

实际上,即使修改 f() 以在 RDD 上调用 unpersist() 也不会改变任何事情。

def ff():
    """ modified f() with explicit call to unpersist() """
  rdd = sc.parallelize(range(N))
  for i in range(10):
    rdd.unpersist()
    print(rdd.filter(lambda x: x > i * THRESHOLD).count())

%%timeit
ff()
> 1 loops, best of 3: 2.25 s per loop

unpersist() 的文档声明它“将 [s] RDD 标记为非持久的,并从内存和磁盘中删除 [s] 它的所有块”。 但是,真的是这样吗?还是 Spark 知道它会在以后使用 RDD 时忽略对 unpersist 的调用?

【问题讨论】:

    标签: python caching apache-spark pyspark rdd


    【解决方案1】:

    在这里缓存没有任何价值。从range 创建RDD 非常便宜(每个分区只需要两个整数即可),并且您应用的操作不能真正从缓存中受益。 persist 应用于 Java 对象而不是 Python 对象,并且您的代码在 RDD 创建和第一次转换之间不执行任何工作。

    即使您忽略所有这些,这也是一项非常简单的任务,数据很少。总成本很可能是由日程安排和沟通决定的。

    如果您想查看缓存的实际效果,请考虑以下示例:

    from pyspark import SparkContext
    import time
    
    def f(x):
       time.sleep(1)
        return x
    
    sc = SparkContext("local[5]")
    rdd = sc.parallelize(range(50), 5).map(f)
    rdd.cache()
    
    %time rdd.count()   # First run, no data cached ~10 s
    ## CPU times: user 16 ms, sys: 4 ms, total: 20 ms
    ## Wall time: 11.4 s
    ## 50
    
    %time rdd.count()  # Second time, task results fetched from cache
    ## CPU times: user 12 ms, sys: 0 ns, total: 12 ms
    ## Wall time: 114 ms
    ## 50
    
    rdd.unpersist()  # Data unpersisted
    
    %time rdd.count()  #  Results recomputed ~10s
    ## CPU times: user 16 ms, sys: 0 ns, total: 16 ms 
    ## Wall time: 10.1 s
    ## 50
    

    虽然在像这样的简单情况下,一个持久行为是可以预测的,但一般缓存应该被视为提示而不是合同。任务输出可以根据可用资源保留或不保留,并且可以在没有任何用户干预的情况下从缓存中逐出。

    【讨论】:

    • 您好,谢谢您的回复。澄清一下,您是说 spark 确实丢弃了临时 RDD——但我应用的操作非常便宜,以至于我看不出有什么区别?
    • 没有临时 RDD 这样的东西,因为 RDD 仅作为驱动程序上的一个小对象存在。使用标准语言工具超出范围时会丢弃任务结果。但是忽略技术细节,这里没有什么可以缓存的。在您称其为您的数据时,您的数据基本上是 n range 对象,其中 n 是分区数。
    • 嗨 - 我在这里使用具有特殊含义的术语“短暂” - 即用户不要求保留的rdds。我不仅对特定示例感兴趣,那是为了说明我的问题。我的问题是 rdds 何时从集群节点内存中被丢弃,如果我们不要求它们持久存在的话。
    • 参见例如stackoverflow.com/questions/34117469/… RDD 不是数据(尽管我们倾向于这样认为)。工人只能看到标准迭代器。不过,可以保留随机播放文件(请参阅stackoverflow.com/questions/34580662/…)。但是再次在您的代码中没有什么可以缓存
    • 谢谢。我可能遗漏了一些东西,但现在我没有看到标题问题的直接答案。我会仔细查看您链接到的帖子并尝试更好地理解。
    猜你喜欢
    • 2021-05-27
    • 1970-01-01
    • 2016-08-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多