【发布时间】:2018-05-20 11:52:18
【问题描述】:
我有一个包含 40 个分区的主题。设置是这样的:
def on_assign (c,ps):
for p in ps:
p.offset=0
print ps
c.assign(ps)
conf = {'bootstrap.servers': 'localhost:9092'
'enable.auto.commit' : False,
'group.id' : 'confluent_consumer',
'default.topic.config': {'auto.offset.reset': 'earliest'}
}
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)
msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
我想从偏移量 0 读取主题 topic.source 的所有分区。但我没有看到所有分区都发生这种情况。对于某些分区,它从我假设是提交的偏移量的特定偏移量读取,每次更改 group.id 也无济于事。无论提交的偏移量如何,我如何从头开始阅读该主题的所有分区?
我在on_assign() 中打印了ps,它为所有40 个分区打印了类似的内容:
[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
【问题讨论】:
标签: python apache-kafka kafka-consumer-api confluent-platform