【问题标题】:Kafka python consumer reading all the messages when startedKafka python消费者在启动时读取所有消息
【发布时间】:2016-04-13 22:40:28
【问题描述】:

我正在使用以下代码从主题中读取消息。我面临两个问题。 每当我启动消费者时,它正在读取队列中的所有消息? 如何只读取未读消息?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

【问题讨论】:

  • 我认为你必须在阅读后consumer.commit()
  • 感谢@KenjiNoguchi,我尝试使用 consumer.commit() 仍然无法正常工作。任何提示

标签: python apache-kafka kafka-python


【解决方案1】:

正如@Kenji 所说,您必须使用consumer.commit() 提交偏移量。如果您不想手动提交,您可以通过将enable_auto_commit=True 传递给您的KafkaConsumer 来启用自动提交。您可能还想调整auto_commit_interval_ms,这是每次自动提交之间的时间间隔(以毫秒为单位)。见这里:http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html

【讨论】:

猜你喜欢
  • 2021-10-11
  • 2020-04-13
  • 2017-11-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-04
  • 1970-01-01
  • 2017-08-31
相关资源
最近更新 更多