【问题标题】:Spark cache vs broadcastSpark缓存与广播
【发布时间】:2016-06-27 14:37:15
【问题描述】:

看起来广播方法在我的集群中制作了 RDD 的分布式副本。另一方面,cache() 方法的执行只是将数据加载到内存中。

但是我不明白缓存的RDD在集群中是如何分布的。

您能告诉我在什么情况下我应该使用rdd.cache()rdd.broadcast() 方法吗?

【问题讨论】:

标签: caching apache-spark


【解决方案1】:

cache()persist() 允许跨操作使用数据集。

当您持久化一个 RDD 时,每个节点都会将它计算的任何分区存储在内存中,并在对该数据集(或从它派生的数据集)的其他操作中重用它们。这使得未来的行动更快(通常超过 10 倍)。 缓存是迭代算法和快速交互使用的关键工具。

每个持久化的 RDD 都可以使用不同的存储级别进行存储,例如,允许您将数据集持久化在磁盘上,将其持久化在 内存 中,但作为序列化的 Java 对象(以节省空间), 跨节点复制它,或将其存储在堆外

Broadcast variables 允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。 Spark 还尝试使用高效的广播算法分发广播变量以降低通信成本。

您可以在此documentation 页面找到更多详细信息。

有用的帖子:

Advantage of Broadcast Variables

What is the difference between cache and persist?

【讨论】:

    【解决方案2】:

    你能告诉我在什么情况下我应该使用 rdd.cache() 和 rdd.broadcast() 方法?

    RDD 被分成分区。这些分区本身充当整个 RDD 的不可变子集。当 Spark 执行图的每个阶段时,每个分区都会被发送到对数据子集进行操作的工作人员。反过来,如果 RDD 需要重新迭代,每个工作人员都可以缓存数据。

    广播变量用于向每个工人发送一些不可变状态一次。当你想要一个变量的本地副本时,你可以使用它们。

    这两个操作完全不同,每一个都代表不同问题的解决方案。

    【讨论】:

    • 那么是不是说广播变量在executor的磁盘中,cache RDD在executor的物理内存中?
    • @SurenderRaja 这就是我的理解。但请记住,如果 rdd.cache() 不适合 RAM,它可能会溢出到磁盘。
    【解决方案3】:

    您能告诉我在什么情况下应该使用 rdd.cache() 和 rdd.broadcast() 方法吗?

    让我们举个例子——假设您有一个employee_salary 数据,其中包含每个员工的部门和薪水。现在说任务是找到每个员工平均部门工资的分数。 (如果员工 e1 他的部门是 d1,我们需要找到 e1.salary/average(d1 中的所有薪水))。

    现在的一种方法是——首先将数据读入一个 rdd——比如 rdd1。然后依次做两件事*-

    首先,使用 rdd1* 计算部门平均工资。您最终将在驱动程序上获得部门平均工资结果 - 基本上是一个包含 deptId 与平均数的地图对象。

    其次,您需要使用此结果将每位员工的工资除以各自部门的平均工资。请记住,每个工人都可以有来自任何部门的员工,因此您需要访问每个工人的部门平均工资结果。这该怎么做?好吧,您可以将在驱动程序上获得的平均工资图通过广播发送给每个工人,然后可以将其用于计算 rdd1 中每个“行”的工资分数。

    缓存一个 RDD 怎么样?请记住,从最初的 rdd1 开始,有两个计算分支——一个用于计算部门平均值,另一个用于将这些平均值应用于 rdd 中的每个员工。现在,如果你不缓存 rdd1,那么对于上面的第二个任务,你可能需要再次返回磁盘读取并重新计算它,因为当你到达这一点时,spark 可能已经从内存中驱逐了这个 rdd。但是由于我们知道我们将使用相同的 rdd,我们可以要求 Spark 在第一次时将其保存在内存中。然后下次我们需要对其应用一些转换时,我们已经在内存中拥有它了。

    *我们可以使用基于部门的分区,这样您就可以避免广播,但为了说明的目的,假设我们不这样做。

    【讨论】:

      【解决方案4】:

      用例

      当您想多次使用某个对象时,您可以缓存或广播它。

      你只能缓存一个 RDD 或 RDD-derivative,而你可以广播任何类型的对象,包括 RDD。

      我们在处理 RDD/DataFrame/DataSet 时使用 cache(),并且我们希望多次使用数据集,而无需每次都重新计算。

      我们广播一个对象

      1. 我们正在处理相对较小的 RDD/DataFrame/DataSet,并且广播它比缓存提供性能优势(例如,如果我们在连接中使用数据集)
      2. 我们正在处理一个普通的旧 Scala/Java 对象,它将用于作业的多个阶段。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-07-08
        • 1970-01-01
        • 2019-10-23
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多