【问题标题】:How do I get the the offset of last message of a Kafka topic using confluent-kafka-python?如何使用 confluent-kafka-python 获取 Kafka 主题的最后一条消息的偏移量?
【发布时间】:2022-02-20 07:51:11
【问题描述】:

我需要使用 confluent-kafka-python 检索一个主题的最后 N 条消息。

我已经阅读https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html# 一天了,但没有找到任何合适的方法来获取最后一条消息的偏移量,因此我无法计算消费者开始时的偏移量。

请帮忙。谢谢!

【问题讨论】:

    标签: python apache-kafka confluent-kafka-python


    【解决方案1】:

    您需要消费者的get_watermark_offsets() 功能。你用TopicPartition 的列表调用它,它会为每个分区返回一个元组(int, int)(低,高)。

    https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.get_watermark_offsets

    类似这样的:

    
    from confluent_kafka import Consumer, TopicPartition
    
    # create the Consumer with a connection to your brokers
    
    topic_name = "my.topic"
    
    topicparts = [TopicPartition(topic_name, i) for i in range(0, 8)]
    
    offsets = consumer.get_watermark_offsets(topicparts)
    
    for p in enumerate(offsets):
        msg = "partition {p} starting offset {so} last offset {lo}"
        print(msg.format(p=p, so=offsets[p][0], lo=offsets[p][1]))
    

    【讨论】:

      猜你喜欢
      • 2017-12-12
      • 2021-05-12
      • 1970-01-01
      • 2018-04-20
      • 1970-01-01
      • 2016-11-20
      • 1970-01-01
      • 1970-01-01
      • 2016-05-27
      相关资源
      最近更新 更多