【问题标题】:Confluent Kafka: Consumer does not read from beginning for all partitions in a topicConfluent Kafka:消费者不会从头开始读取主题中的所有分区
【发布时间】:2018-05-20 11:52:18
【问题描述】:

我有一个包含 40 个分区的主题。设置是这样的:

def on_assign (c,ps):
    for p in ps:
        p.offset=0
    print ps
    c.assign(ps)

conf = {'bootstrap.servers': 'localhost:9092'
        'enable.auto.commit' : False,
        'group.id' : 'confluent_consumer',
        'default.topic.config': {'auto.offset.reset': 'earliest'}
        }
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)

msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())

我想从偏移量 0 读取主题 topic.source 的所有分区。但我没有看到所有分区都发生这种情况。对于某些分区,它从我假设是提交的偏移量的特定偏移量读取,每次更改 group.id 也无济于事。无论提交的偏移量如何,我如何从头开始阅读该主题的所有分区?

我在on_assign() 中打印了ps,它为所有40 个分区打印了类似的内容:

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on

【问题讨论】:

标签: python apache-kafka kafka-consumer-api confluent-platform


【解决方案1】:

如果您使用将group.id 设置为新值或使用未提交任何偏移量的组并将auto.offset.reset 设置为earliest,那么消费者将从分区的开头开始。

也就是说,开头可能不是偏移量 0。根据您的代理的日志保留设置,Kafka 可以删除消息,因此您分区中的第一条可用消息可以是任何偏移量。

【讨论】:

  • 嗨,Mickael,感谢您的回答。是否有任何命令/工具可以知道主题分区的起始偏移量是多少(以防之前的消息由于保留策略而被删除)?
  • 如果要查看broker,可以进入kafka的log.dirs,找到分区目录。里面应该有一个*.log 文件。文件名应指明第一个偏移量。例如,如果您看到00000000000000000216.log,那么 216 是第一个偏移量。根据您的设置,可能会有多个日志文件,请取最小的名称。
  • 嗨,迈克尔!谢谢,我找到了 log.dirs 并且可以看到 *.log 文件
猜你喜欢
  • 1970-01-01
  • 2017-10-17
  • 1970-01-01
  • 2022-01-20
  • 1970-01-01
  • 2022-11-19
  • 1970-01-01
  • 2020-07-18
  • 2022-06-14
相关资源
最近更新 更多