【发布时间】:2019-03-21 12:14:28
【问题描述】:
我正在使用 Apache Flink,并尝试使用 Apache Kafka 协议连接到 Azure eventthub 以接收来自它的消息。我设法连接到 Azure eventthub 并接收消息,但我无法使用此处所述的 flink 功能“setStartFromTimestamp(...)” (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration)。 当我试图从时间戳中获取一些消息时,Kafka 说代理端的消息格式在 0.10.0 之前。 有人面临这个吗? Apache Kafka 客户端版本为 2.0.1 Apache Flink 版本为 1.7.2
更新:尝试在消费者包中使用 Azure-Event-Hub 快速入门示例 (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java) 添加代码以获取时间戳偏移,如果消息版本低于 0.10.0 kafka 版本,它将按预期返回 null。
List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
System.out.println(offsetAndTimestamp);
【问题讨论】:
-
看起来您回复了自己的帖子?
-
不,因为 Azure Event Hub 应该有 kafka api 1.0 版本,根据他们的文档,它大于 0.10.0。我刚刚确认它不适用于他们的简单示例
-
好像不是,或者有bug
标签: apache-kafka apache-flink azure-eventhub