【问题标题】:Kafka Consumer: How to programatically consume from specific offset in Go SaramaKafka 消费者:如何以编程方式从 Go Sarama 中的特定偏移量消费
【发布时间】:2020-05-04 23:23:45
【问题描述】:

最近,我开始学习使用kafka。我正在进行的项目使用 sarama

对于阅读消息,我使用ConsumerGroup

如果foo 返回false,我需要在一段时间后再次阅读该消息。如何做到这一点?

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

            if ok := foo(message); ok {
                session.MarkMessage(message, "")
            } else {
                // ???
            }

    }

    return nil
}

【问题讨论】:

  • 你不能。这不是卡夫卡的工作方式。如果您不再提交任何消息并重新启动,您将能够从上次提交的偏移量开始使用。可能最简单的事情就是再次发布消息。
  • 如果我不想发布新消息,那么要再次阅读该消息,我是否必须每次都创建一个新的ConsumeGroup,并设置一个偏移量?

标签: go apache-kafka kafka-consumer-api sarama


【解决方案1】:

您可以通过在您的 Consumer Group 的 Setup() 回调中包含以下内容,将 Consumer Group 的偏移量重置为较旧的偏移量:

func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
    sess.ResetOffset(topic, partition, offset, "")

    return nil
}

你也可以通过控制台实现:

kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --topic myTopicName \
    --reset-offsets \
    --to-offfset 100 \
    --execute

【讨论】:

  • 要应用偏移量更改,我需要在哪里调用Setup 方法?
  • @rogatzkij 这是您的消费者组的回电
  • 在创建会话之前调用。所以我必须使用consumerGroup.Consume 重新创建会话来移动偏移量。我理解正确吗?
  • @rogatzkij 是的。
猜你喜欢
  • 1970-01-01
  • 2017-07-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-04
  • 1970-01-01
  • 2015-03-25
相关资源
最近更新 更多