【问题标题】:Control message offset in Kafka consumer控制 Kafka 消费者中的消息偏移量
【发布时间】:2017-04-05 05:21:36
【问题描述】:

实现是在 Python 中。使用 confluent_kafka。

我有一个消费者对象来轮询来自 kafka 主题的消息。这些消息用于其他大对象的进一步处理,由于大小,我无法在每次消息处理后备份该对象。

我定期转储对象,然后手动提交消费者。下面是我实现的示例代码。

from confluent_kafka import Consumer, KafkaError, TopicPartition

c = Consumer({
    'bootstrap.servers': 'myserver',
    'group.id': 'mygroup',
    'default.topic.config': {'auto.offset.reset': 'smallest'},
    'enable.auto.commit': "false"
})
c.subscribe(['mytopic'])

offsets = {} 

for i in range(10):
    msg = c.poll()

    if msg.error():
        continue

    par = msg.partition()
    off = msg.offset() 
    offsets[p] = off 

c.commit(async=False) 

print(offsets) 

当我第二次运行此代码时,我希望消息偏移量(如果来自同一个分区)应该是下一个,即 +1,与打印的前一个偏移量相比。

但是偏移量进步了很多。还有几百个。

我也尝试手动分配位置如下:

lst_part = []

for par, off in offsets.items():
    lst_part.append(TopicPartition('mytopic', par, off))

c.assign(lst_part)

# then start polling messages

新轮询的消息不是分配的偏移量+1。

【问题讨论】:

    标签: python apache-kafka kafka-consumer-api


    【解决方案1】:

    c.commit(async=False) 将提交已通过poll() 调用从客户端返回消息的所有已使用分区。

    如果您想要更精细地控制要提交的偏移量,您可以将显式 [TopicPartition(..)] 列表传递给 commit()(确保提交 last_message_offset+1)或禁用 auto.offset.store 并显式调用 store_offsets()您希望为将来的commit() 呼叫存储的消息/偏移量。

    请注意,store_offsets() 仅在 master 上可用,并且在 confluent-kafka-python 客户端的已发布版本中尚不可用,但很快就会可用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-03-22
      • 1970-01-01
      • 2019-05-01
      • 2018-02-03
      • 1970-01-01
      • 2017-05-12
      • 2020-08-28
      相关资源
      最近更新 更多