【发布时间】:2018-05-13 11:41:22
【问题描述】:
我有一个项目正在使用来自 Kafka 的数据。显然,有几个字段将包含在标题中,我也需要为每条消息阅读这些字段。 Flink 目前有没有办法做到这一点?
谢谢!
【问题讨论】:
标签: java apache-kafka apache-flink
我有一个项目正在使用来自 Kafka 的数据。显然,有几个字段将包含在标题中,我也需要为每条消息阅读这些字段。 Flink 目前有没有办法做到这一点?
谢谢!
【问题讨论】:
标签: java apache-kafka apache-flink
@Jicaar,实际上 Kafka 从 0.11.0.0 版本开始添加了 Header 概念。 https://issues.apache.org/jira/browse/KAFKA-4208
问题是flink-connector-kafka-0.11_2.11 附带flink-1.4.0,据说支持kafka-0.11.0.0 只是在从kafka 读取时忽略消息头。
不幸的是,除非您在 flin 中实现自己的 KafkaConsumer,否则无法读取这些标头。
我也有兴趣阅读 kafka 消息头,希望 Flink 团队能增加对此的支持。
【讨论】:
我遇到了类似的问题,并在 Flink 1.8 中找到了解决方法。这是我写的:
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer("topic", new JSONKeyValueDeserializationSchema(true){
ObjectMapper mapper = new ObjectMapper();
@Override
public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
ObjectNode result = super.deserialize(record);
if (record.headers() != null) {
Map<String, JsonNode> headers = StreamSupport.stream(record.headers().spliterator(), false).collect(Collectors.toMap(h -> h.key(), h -> (JsonNode)this.mapper.convertValue(new String(h.value()), JsonNode.class)));
result.set("headers", mapper.convertValue(headers, JsonNode.class));
}
return result;
}
}, kafkaProps);
希望这会有所帮助!
【讨论】:
KafkaDeserializationSchema,而不是覆盖JSONKeyValueDeserializationSchema。