【问题标题】:Getting listener timeout after waiting for [10000] ms from ElasticsearchIO从 ElasticsearchIO 等待 [10000] 毫秒后获取侦听器超时
【发布时间】:2017-03-10 14:33:52
【问题描述】:

我正在尝试测试一个简单的 Apache Beam 代码,其源代码为 Elasticsearch。我从git repo 中找到了 ElasticsearchIO 源类。

我修改了 Beam 的 MinimalWordCount 示例,将源包含为 Elasticsearch 而不是 TextIO。以下是要点,

String[] hosts = new String[1];
hosts[0]="http://localhost:9200";
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(
      ElasticsearchIO.read().withConnectionConfiguration(
        ElasticsearchIO.ConnectionConfiguration.create(hosts, "test_index", "users").withUsername("esuser").withPassword("password")
      )
 )
.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
         for (String word : c.element().split("[^a-zA-Z']+")) { 
           if (!word.isEmpty()) {
             c.output(word);
           }
         }
      }
}));
p.run().waitUntilFinish();

如果我运行代码,

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount \
 -Pdirect-runner

我遇到错误

执行 Java 类时发生异常。空值: InvocationTargetException:java.io.IOException:监听器超时后 等待 [10000] 毫秒

我调试了ElasticsearchIO.java 并且可以看到一切正常,Elasicsearch 客户端已构建并且代码正在检索索引中的数据。但是读取转换后的 ParDo 函数根本不会执行。 Elasticsearch 客户端一直在等待,最后得到一个超时错误。

我知道 Beam 的 Elasicsearch 连接器仍在开发中。但是谁能帮忙找出我做错了什么?

PS:我在本地运行 Elasticsearch 5.2.1。

【问题讨论】:

    标签: java elasticsearch google-cloud-dataflow apache-beam


    【解决方案1】:

    当前版本的 ElasticsearchIO (beam-0.6.0 2017/03) 尚不支持 Elasticsearch 版本 5。 https://issues.apache.org/jira/browse/BEAM-1637

    跟踪这方面的进展

    【讨论】:

      【解决方案2】:

      超时问题确实是由于当前的 Elasticsearch IO 与 ES v5.x 不兼容。但是不清除滚动在 ES 方面是有代价的:段合并过程(从较小的段中创建更大的段并删除较小的段)被保持,因为 ES 在滚动上下文正在使用旧段时无法删除它们。

      此外,还有一件事:IO.read 将文档返回为 Json,因此拆分 ParDo 也可能会拆分字段名称,而不仅仅是字段值。

      【讨论】:

        【解决方案3】:

        你可以运行 mvn -X 来获得详细的描述吗? 我在弹性论坛上发现了这个 https://discuss.elastic.co/t/es5-indexing-performance-seems-slow/65084/22 https://discuss.elastic.co/t/es5-correct-restclient-failurelistener-behaviour-retry-logic/68211/3

        您或许应该通过超时进行更多调查

        【讨论】:

          【解决方案4】:

          问题在于执行删除滚动的代码。如果我在 ElasticsearchIO.java 中注释该代码,则管道运行良好。

          restClient.performRequest(
                       "DELETE",
                       "/_search/scroll",
                       Collections.<String, String>emptyMap(),
                       entity,
                       new BasicHeader("", ""));
          

          【讨论】:

            【解决方案5】:

            直接涉及收到的异常的问题,标题为“Getting listener timeout after waiting for [10000] ms from ElasticsearchIO”,最近已关闭并显示消息:“已通过添加 ES 5.x 支持 (https://github.com/apache/beam/pull/3703) 解决”。

            这个错误现在仍然表现出来吗?

            【讨论】:

              猜你喜欢
              • 2018-12-22
              • 2019-09-16
              • 2018-12-21
              • 1970-01-01
              • 2015-01-05
              • 2014-05-10
              • 1970-01-01
              • 1970-01-01
              • 2017-02-02
              相关资源
              最近更新 更多