【发布时间】:2017-04-05 05:21:36
【问题描述】:
实现是在 Python 中。使用 confluent_kafka。
我有一个消费者对象来轮询来自 kafka 主题的消息。这些消息用于其他大对象的进一步处理,由于大小,我无法在每次消息处理后备份该对象。
我定期转储对象,然后手动提交消费者。下面是我实现的示例代码。
from confluent_kafka import Consumer, KafkaError, TopicPartition
c = Consumer({
'bootstrap.servers': 'myserver',
'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'},
'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])
offsets = {}
for i in range(10):
msg = c.poll()
if msg.error():
continue
par = msg.partition()
off = msg.offset()
offsets[p] = off
c.commit(async=False)
print(offsets)
当我第二次运行此代码时,我希望消息偏移量(如果来自同一个分区)应该是下一个,即 +1,与打印的前一个偏移量相比。
但是偏移量进步了很多。还有几百个。
我也尝试手动分配位置如下:
lst_part = []
for par, off in offsets.items():
lst_part.append(TopicPartition('mytopic', par, off))
c.assign(lst_part)
# then start polling messages
新轮询的消息不是分配的偏移量+1。
【问题讨论】:
标签: python apache-kafka kafka-consumer-api