【发布时间】:2017-02-01 15:46:49
【问题描述】:
我的需要是让生产者从它崩溃前处理的最后一条消息开始。幸好我的情况是只有一个topic,一个partition,一个consumer。
为此,我尝试了https://github.com/Shopify/sarama,但它似乎还不可用。 我现在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。
我无法检索最后提交的偏移量
我不知道如何制作一个 sarama consumer 从所述偏移量开始。目前我发现的唯一参数是Config.Producer.Offsets.Initial。
- 如何检索最后提交的偏移量?
- 如何让消费者从最后一条offset已经提交的消息开始?
OffsetNewest将使它从产生的最后一条消息开始,而不是消费者最后处理的消息。 - 是否可以仅使用 Shopify/sarama 而不是 bsm/sarama-cluster 这样做?
提前致谢
附:我使用的是 Kafka 10.0,所以偏移量是存储在 kafka 中而不是 zookeeper 中。
EDIT1: 部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到找到未处理的消息。
【问题讨论】:
标签: go apache-kafka sarama