【问题标题】:Way to read data from Kafka headers in Apache Flink从 Apache Flink 中的 Kafka 标头读取数据的方法
【发布时间】:2018-05-13 11:41:22
【问题描述】:

我有一个项目正在使用来自 Kafka 的数据。显然,有几个字段将包含在标题中,我也需要为每条消息阅读这些字段。 Flink 目前有没有办法做到这一点?

谢谢!

【问题讨论】:

    标签: java apache-kafka apache-flink


    【解决方案1】:

    @Jicaar,实际上 Kafka 从 0.11.0.0 版本开始添加了 Header 概念。 https://issues.apache.org/jira/browse/KAFKA-4208

    问题是flink-connector-kafka-0.11_2.11 附带flink-1.4.0,据说支持kafka-0.11.0.0 只是在从kafka 读取时忽略消息头。

    不幸的是,除非您在 flin 中实现自己的 KafkaConsumer,否则无法读取这些标头。

    我也有兴趣阅读 kafka 消息头,希望 Flink 团队能增加对此的支持。

    【讨论】:

      【解决方案2】:

      我遇到了类似的问题,并在 Flink 1.8 中找到了解决方法。这是我写的:

      FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer("topic", new JSONKeyValueDeserializationSchema(true){
          ObjectMapper mapper = new ObjectMapper();
          @Override
          public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
              ObjectNode result = super.deserialize(record);
              if (record.headers() != null) {
                  Map<String, JsonNode> headers = StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(h -> h.key(), h -> (JsonNode)this.mapper.convertValue(new String(h.value()), JsonNode.class)));
                  result.set("headers", mapper.convertValue(headers, JsonNode.class));
              }
      
              return result;
          }
      }, kafkaProps);
      

      希望这会有所帮助!

      【讨论】:

      • 您也可以自己实现KafkaDeserializationSchema,而不是覆盖JSONKeyValueDeserializationSchema
      猜你喜欢
      • 2018-02-01
      • 2016-11-08
      • 2017-05-06
      • 2018-05-09
      • 2021-04-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-01
      相关资源
      最近更新 更多