【问题标题】:How to connect to elasticsearch 5.x in Apache Flink如何在 Apache Flink 中连接到 elasticsearch 5.x
【发布时间】:2017-01-04 00:37:07
【问题描述】:

Apache Flink 版本是 1.1.3,elasticsearch 是 5.1.1。

flink 的文档只解释了 elasticsearch 2.x API (flink-connector-elasticsearch2_2.1.1) https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/elasticsearch2.html

是否没有弹性搜索 5.x 的 flink 连接器 API?

我尝试将此版本用于 elasticsearch 5.x,但遇到如下错误

Flink 异常

01/03/2017 20:01:21 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)

elasticsearch 日志

[2017-01-03T20:01:21,642][WARN ][o.e.h.n.Netty4HttpServerTransport] [7CTs2-R] caught exception while handling client http traffic, closing connection [id: 0xbce51ef2, L:/127.0.0.1:9200 - R:/127.0.0.1:58429]
java.io.IOException: The current connection has been interrupted by a remote host
    at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[?:?]
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[?:?]
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:?]
    at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[?:?]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:?]
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:261) ~[netty-buffer-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) ~[netty-buffer-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:366) ~[netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:118) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:536) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:490) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) [netty-transport-4.1.6.Final.jar:4.1.6.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.6.Final.jar:4.1.6.Final]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

【问题讨论】:

    标签: elasticsearch apache-flink


    【解决方案1】:

    Apache Flink 1.1.3 不包含 Elasticsearch 5.x 的连接器。

    此类连接器的一些工作正在进行中(JIRAGithub PR),但尚未添加到 Flink 代码库中。

    您可以尝试从拉取请求作者的repository 构建连接器。

    【讨论】:

      【解决方案2】:

      您应该能够使用以下依赖项并使其工作。

      请注意,在我的 Mac 上,至少对我来说,flink 发布版本似乎不适用于 elasticsearch5 依赖项。因此,如果您可以将其降级为 1.3-SNAPSHOT,它应该可以工作。

      我能够让它工作并将事件发布到 Elastic Search 5.4。

      更改以下内容

      在 pom.xml 中
      1.3-快照

          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-connector-elasticsearch5_2.10</artifactId>
              <version>${flink.version}</version>
          </dependency>
      

      在您使用 elasticsearch2 的 Java 代码中,将其更改为 5,如下所示

      导入 org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;

      在扩展 ElasticsearchSinkFunction 的类中添加此方法(如在 PopularPlaceInserter 中)以及现有的流程方法。确保您更改了 elasticsearch 的索引和映射名称类型,您应该能够运行该程序。

          public IndexRequest createIndexRequest(String element) {
              Map<String, String> json = new HashMap<>();
              json.put("data", element);
      
              return Requests.indexRequest()
                      .index("nyc-idx")
                      .type("popular-locations")
                      .source(json);
          }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-08-08
        • 2023-03-07
        • 1970-01-01
        相关资源
        最近更新 更多