【发布时间】:2020-05-04 23:23:45
【问题描述】:
最近,我开始学习使用kafka。我正在进行的项目使用 sarama。
对于阅读消息,我使用ConsumerGroup。
如果foo 返回false,我需要在一段时间后再次阅读该消息。如何做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
【问题讨论】:
-
你不能。这不是卡夫卡的工作方式。如果您不再提交任何消息并重新启动,您将能够从上次提交的偏移量开始使用。可能最简单的事情就是再次发布消息。
-
如果我不想发布新消息,那么要再次阅读该消息,我是否必须每次都创建一个新的
ConsumeGroup,并设置一个偏移量?
标签: go apache-kafka kafka-consumer-api sarama