【问题标题】:How can I commit a message offset to Kafka topic after consumption?消费后如何向Kafka主题提交消息偏移量?
【发布时间】:2021-10-25 16:29:48
【问题描述】:

似乎没有提交偏移量。

我正在使用 kafka Python 包。这是我的代码

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'quickstart-events',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    auto_commit_interval_ms=1000,
)

msg_pack = consumer.poll(max_records=10,timeout_ms=500,update_offsets=True)
for tp,messages in msg_pack.items():
    for message in messages:
        print("%s:%d:%d: key=%s value=%s" % (tp.topic, tp.partition,
                                             message.offset, message.key,
                                             message.value))

但问题是我总是从一开始就收到所有消息。有时我什么也得不到。你能帮帮我吗?

quickstart-events:0:52: key=None value=b'1'
quickstart-events:0:53: key=None value=b'2'
quickstart-events:0:54: key=None value=b'3'
quickstart-events:0:55: key=None value=b'4'
quickstart-events:0:56: key=None value=b'5'
quickstart-events:0:57: key=None value=b'1'
quickstart-events:0:58: key=None value=b'2'
quickstart-events:0:59: key=None value=b'3'
quickstart-events:0:60: key=None value=b'4'
quickstart-events:0:61: key=None value=b'5'

【问题讨论】:

  • 我会将您的问题改写为仅限 Kafka 的问题,因为它不涉及 AWS 的任何方面或 SQS 的独特方面
  • Kafka 和 SQS 并不完全等同。很难说出你的期望

标签: python apache-kafka kafka-python


【解决方案1】:

我总是从一开始就收到所有的消息。

这应该只发生一次,基于这些

auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',

有时我没有得到任何东西

那么您可能在消费者组中没有滞后(没有要消费的记录)。您需要使用 kafka-consumer-groups --describe --group my-group 来检查延迟或偏移量是否以您期望的方式提交。

如果偏移量没有按照您期望的方式提交,那么您应该禁用自动提交并手动进行

【讨论】:

  • 感谢您的回答,但它不起作用。您能否编辑我们在处理每条消息后如何更新偏移量?
  • 什么不起作用? kafka-consumer-groups 命令行界面?您是否尝试过在循环中调用consumer.commit
  • 是的,我尝试在循环中添加consumer.commit()。它的工作原理谢谢
猜你喜欢
  • 2018-06-14
  • 2022-11-13
  • 1970-01-01
  • 2014-10-07
  • 1970-01-01
  • 1970-01-01
  • 2017-05-12
  • 1970-01-01
  • 2017-08-22
相关资源
最近更新 更多