【发布时间】:2016-04-13 22:40:28
【问题描述】:
我正在使用以下代码从主题中读取消息。我面临两个问题。 每当我启动消费者时,它正在读取队列中的所有消息? 如何只读取未读消息?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
【问题讨论】:
-
我认为你必须在阅读后
consumer.commit()。 -
感谢@KenjiNoguchi,我尝试使用 consumer.commit() 仍然无法正常工作。任何提示
标签: python apache-kafka kafka-python