【问题标题】:Apache Flink integration with ElasticsearchApache Flink 与 Elasticsearch 的集成
【发布时间】:2016-10-30 01:32:06
【问题描述】:

我正在尝试将 Flink 与 Elasticsearch 2.1.1 集成,我正在使用 maven 依赖项

     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>

这是我从 Kafka 队列中读取事件的 Java 代码(工作正常),但不知何故,事件没有在 Elasticsearch 中发布,也没有错误,如果我在下面的代码中更改任何与 ElasticSearch 的端口、主机名、集群名称或索引名称相关的设置然后我立即看到一个错误,但目前它没有显示任何错误,也没有在 ElasticSearch 中创建任何新文档

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.print();

    Map<String, String> config = new HashMap<>();
    config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");

    config.put("cluster.name", "FlinkDemo");

    List<InetSocketAddress> transports = new ArrayList<>();
    transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));

    messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));

    env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
    private static final long serialVersionUID = 1L;

    public IndexRequest createIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("flink").id("hash"+element).source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}

【问题讨论】:

  • 由于您是在本地执行代码,我想您可以轻松地使用您的 IDE 运行它并使用调试器来查看发生了什么。例如,我会在 createIndexRequest 方法中设置一个断点,看看它是否被调用,以及之后会发生什么。
  • 嗨@rmetzger,我确实在本地机器上运行它并进行了调试,但是我唯一缺少的是正确配置日志记录,因为大多数弹性问题在“log.警告”声明。问题是 elasticsearch-2.2.1 客户端 API 中“BulkRequestHandler.java”中的异常,该异常引发了错误 -“org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;”因为我创建了索引但不是我觉得很奇怪的类型,因为它应该主要关注索引而不是该索引的 _type 属性。

标签: java elasticsearch apache-flink


【解决方案1】:

我确实在本地机器上运行它并进行了调试,但是我唯一缺少的是正确配置日志记录,因为大多数弹性问题都在“log.warn”语句中描述。问题是 elasticsearch-2.2.1 客户端 API 中“BulkRequestHandler.java”中的异常,它引发错误 -“org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;”因为我创建了索引但不是我觉得很奇怪的类型,因为它应该主要关注索引并默认创建类型。

【讨论】:

  • 您能详细解释一下您为使其正常工作所做的工作吗?
  • 您可以添加log4j.properties文件来记录完整的日志,然后会发现错误。这是我的canse中的es认证问题。
【解决方案2】:

我找到了一个很good example的Flink & Elasticsearch Connector

第一个 Maven 依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

第二个示例 Java 代码

public static void writeElastic(DataStream<String> input) {

    Map<String, String> config = new HashMap<>();

    // This instructs the sink to emit after every element, otherwise they would be buffered
    config.put("bulk.flush.max.actions", "1");
    config.put("cluster.name", "es_keira");

    try {
        // Add elasticsearch hosts on startup
        List<InetSocketAddress> transports = new ArrayList<>();
        transports.add(new InetSocketAddress("127.0.0.1", 9300)); // port is 9300 not 9200 for ES TransportClient

        ElasticsearchSinkFunction<String> indexLog = new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                String[] logContent = element.trim().split("\t");
                Map<String, String> esJson = new HashMap<>();
                esJson.put("IP", logContent[0]);
                esJson.put("info", logContent[1]);

                return Requests
                        .indexRequest()
                        .index("viper-test")
                        .type("viper-log")
                        .source(esJson);
            }

            @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        };

        ElasticsearchSink esSink = new ElasticsearchSink(config, transports, indexLog);
        input.addSink(esSink);
    } catch (Exception e) {
        System.out.println(e);
    }
}

【讨论】:

    猜你喜欢
    • 2020-04-15
    • 1970-01-01
    • 1970-01-01
    • 2017-01-23
    • 1970-01-01
    • 2020-08-15
    • 2021-05-15
    • 2016-04-27
    • 2019-02-12
    相关资源
    最近更新 更多