【发布时间】:2017-05-04 09:48:14
【问题描述】:
我在地图操作中使用LongAccumulator 作为共享计数器。但似乎我没有正确使用它,因为工作节点上的计数器状态没有更新。这是我的计数器类的样子:
public class Counter implements Serializable {
private LongAccumulator counter;
public Long increment() {
log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
counter.add(1);
Long value = counter.value();
log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
return value;
}
public Counter(JavaSparkContext javaSparkContext) {
counter = javaSparkContext.sc().longAccumulator();
}
}
据我了解的文档,当应用程序在多个工作节点中运行时,这应该可以正常工作:
累加器是仅通过关联和交换操作“添加”到的变量,因此可以有效地并行支持。它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数值类型的累加器,程序员可以添加对新类型的支持。
但这是当计数器在 2 个不同的工作人员上递增时的结果,并且看起来状态未在节点之间共享:
INFO 计数器:在线程上使用 id:866 递增计数器:Executor 任务启动 worker-6 INFO 计数器:计数器的 id 为:866 的值是:线程上的 1:Executor 任务启动 worker-6
信息计数器:递增计数器,id:866 on thread:Executor task launch worker-0 INFO Counter: id: 866 的计数器值是: 1 on thread: Executor task launch worker-0
我对累加器的概念理解有误,还是我必须使用任何设置来启动任务?
【问题讨论】:
标签: java apache-spark