【问题标题】:Flink Kafka connector to eventhubFlink Kafka 连接器到 eventthub
【发布时间】:2019-03-21 12:14:28
【问题描述】:

我正在使用 Apache Flink,并尝试使用 Apache Kafka 协议连接到 Azure eventthub 以接收来自它的消息。我设法连接到 Azure eventthub 并接收消息,但我无法使用此处所述的 flink 功能“setStartFromTimestamp(...)” (https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration)。 当我试图从时间戳中获取一些消息时,Kafka 说代理端的消息格式在 0.10.0 之前。 有人面临这个吗? Apache Kafka 客户端版本为 2.0.1 Apache Flink 版本为 1.7.2

更新:尝试在消费者包中使用 Azure-Event-Hub 快速入门示例 (https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/java) 添加代码以获取时间戳偏移,如果消息版本低于 0.10.0 kafka 版本,它将按预期返回 null。

        List<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC);
        List<TopicPartition> topicPartitions = partitionInfos.stream().map(pi -> new TopicPartition(pi.topic(), pi.partition())).collect(Collectors.toList());
        Map<TopicPartition, Long> topicPartitionToTimestampMap = topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L));
        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = consumer.offsetsForTimes(topicPartitionToTimestampMap);
        System.out.println(offsetAndTimestamp);

【问题讨论】:

  • 看起来您回复了自己的帖子?
  • 不,因为 Azure Event Hub 应该有 kafka api 1.0 版本,根据他们的文档,它大于 0.10.0。我刚刚确认它不适用于他们的简单示例
  • 好像不是,或者有bug

标签: apache-kafka apache-flink azure-eventhub


【解决方案1】:

很抱歉我们错过了这个。 EH 现在支持 Kafka offsetsForTimes()(以前不支持)。

以后请随时针对我们的 Github 提出问题。 https://github.com/Azure/azure-event-hubs-for-kafka

【讨论】:

    猜你喜欢
    • 2023-03-07
    • 1970-01-01
    • 2017-06-27
    • 2017-02-21
    • 1970-01-01
    • 1970-01-01
    • 2021-04-15
    • 1970-01-01
    • 2021-09-09
    相关资源
    最近更新 更多