【问题标题】:Sharing Zookeeper configuration on multiple Spark Executors在多个 Spark Executor 上共享 Zookeeper 配置
【发布时间】:2016-07-06 19:06:52
【问题描述】:

我有一个Zookeeper写的配置信息。我正在使用 Apache Curator 通过 Curator Watcher 读取配置(如果有更好的读取解决方案,我很乐意使用它),因此如果 Zookeeper 中的配置发生更改,我将收到新的配置。我在 Spark 中使用这个配置。如何将其共享给同一应用程序的所有 Spark 执行器?

谢谢!

乐:

谢谢迪凯,

在以下代码中,您将在哪里执行观察程序?我是 spark 新手,我不确定每个工人的工作情况。

谢谢!

final JavaDStream<ElementMessage> nodeMessageStream = mapWithStateDistinctAndFiltered.flatMap(pair -> pair._2.buildElementMessages())
            .filter(f -> f != null);

    nodeMessageStream.foreachRDD(rdd -> {
        rdd.foreachPartition(r -> {
            final ElementRecordRestClient rest = new ElementRecordRestClient(
                    startProps.getProperty(InputPropertyKey.WEPAPP_URL.toString()));
            r.forEachRemaining(message -> {
                rest.createObject(message.toElementRecord());
            });
        });
    });

【问题讨论】:

  • 感谢您的回答。我已经编辑了最初的帖子。

标签: java apache-spark apache-zookeeper apache-curator


【解决方案1】:

在这种情况下,我要做的是在主节点上运行 Curator Watcher,并使用 Spark 的广播变量将配置广播到所有执行程序。每当配置更改时,您都会停止当前的流式传输上下文,并使用新配置启动一个新的流式传输上下文。这将确保您的结果始终一致。

另一种方法是在 foreachPartition lambda 函数中读取 zookeeper 配置。但是由于配置是每个分区独立读取的,所以同一个RDD的不同分区可能会得到不同的配置,这可能不是你所期望的。

【讨论】:

  • 有趣的解决方案,第一个但流的停止和启动让我感到困扰。这到底是什么意思,我怎样才能停止和启动流(手动除外)?谢谢!
  • 您必须通过在当前上下文对象上调用stop 来手动停止处理。然后创建一个新的流上下文并通过调用start 启动它。如果您的输入源像一个持久队列,那么当新的流式传输上下文启动时,它将从前一个上下文停止的地方恢复。
  • 感谢您的帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-15
  • 1970-01-01
相关资源
最近更新 更多