【问题标题】:Why does worker node not see updates to accumulator on another worker nodes?为什么工作节点看不到另一个工作节点上累加器的更新?
【发布时间】:2017-05-04 09:48:14
【问题描述】:

我在地图操作中使用LongAccumulator 作为共享计数器。但似乎我没有正确使用它,因为工作节点上的计数器状态没有更新。这是我的计数器类的样子:

public class Counter implements Serializable {

   private LongAccumulator counter;

   public Long increment() {
      log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
      counter.add(1);
      Long value = counter.value();
      log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
      return value;
   }

   public Counter(JavaSparkContext javaSparkContext) {
      counter = javaSparkContext.sc().longAccumulator();
   }
}

据我了解的文档,当应用程序在多个工作节点中运行时,这应该可以正常工作:

累加器是仅通过关联和交换操作“添加”到的变量,因此可以有效地并行支持。它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数值类型的累加器,程序员可以添加对新类型的支持。

但这是当计数器在 2 个不同的工作人员上递增时的结果,并且看起来状态未在节点之间共享:

INFO 计数器:在线程上使用 id:866 递增计数器:Executor 任务启动 worker-6 INFO 计数器:计数器的 id 为:866 的值是:线程上的 1:Executor 任务启动 worker-6
信息计数器:递增计数器,id:866 on thread:Executor task launch worker-0 INFO Counter: id: 866 的计数器值是: 1 on thread: Executor task launch worker-0

我对累加器的概念理解有误,还是我必须使用任何设置来启动任务?

【问题讨论】:

    标签: java apache-spark


    【解决方案1】:

    shouldn't work:

    然后可以使用 add 方法将在集群上运行的任务添加到其中。但是,他们无法读取其值。只有驱动程序可以读取累加器的值,使用它的 value 方法。

    每个任务都有自己的累加器,在本地更新,并在任务完成并报告结果后与驱动程序上的“共享”副本合并。

    在任务中使用value 时,旧的Accumulator API(现在包装AccumulatorV2)实际上引发了异常,但由于某种原因,它在AccumulatorV2 中被省略了。

    你所经历的实际上与这里描述的旧行为How to print accumulator variable from within task (seem to "work" without calling value method)?

    【讨论】:

      【解决方案2】:

      来自the answer from @user6910411(强调我的):

      每个任务都有自己的累加器,它在本地更新,并在任务完成并报告结果后与驱动程序上的“共享”副本合并

      答案中粗体部分不是 100% 正确的。

      内部和外部累加器的当前值会发送给驱动程序,每次执行程序心跳必须定期发生或驱动程序假定执行程序丢失。

      定期间隔由spark.executor.heartbeatInterval 属性控制,默认为10s

      每个执行者对驱动程序的心跳之间的间隔。 Heartbeats 让驱动程序知道执行程序仍然活着,并使用正在进行的任务的指标对其进行更新。 spark.executor.heartbeatInterval 应该明显小于 spark.network.timeout

      如上所述,心跳是“传输层”,用于将累加器(在执行器上)的部分更新传递给驱动程序。

      有两种类型的累加器——内部的和非内部的(由于缺少更合适的名称,我将把非内部累加器称为非内部的)。

      内部累加器用于 Spark 用来让管理员/操作员知道幕后发生的事情的任务指标。

      Spark 用于向非内部累加器发送部分更新的机制相同,因此每次执行器心跳时驱动程序都可以看到对累加器(在运行任务的执行器上)的本地更新。

      对此我不确定,但驱动程序可能不会将它们提供给代码(= 给外界),但主要是要知道驱动程序知道累加器的当前值(由执行程序延迟心跳)。


      顺便说一句,问题在于工作节点是累加器更新的边界,但实际上这只是一项任务,它为累加器更新创建可见性边界。如果您有一个或两个工作节点(具有一个或多个执行器)并不重要,因为您也不会在单个执行器上看到跨任务的累加器更新。

      累加器更新对于任务来说是本地的,它由驱动程序和任务来判断累加器的任何更新。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-04-02
        • 2020-12-04
        • 1970-01-01
        • 1970-01-01
        • 2020-02-06
        • 2019-03-13
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多