【问题标题】:How to set consumer to start from a specific offset in Golang Kafka 10如何设置消费者从 Golang Kafka 10 中的特定偏移量开始
【发布时间】:2017-02-01 15:46:49
【问题描述】:

我的需要是让生产者从它崩溃前处理的最后一条消息开始。幸好我的情况是只有一个topic,一个partition,一个consumer。

为此,我尝试了https://github.com/Shopify/sarama,但它似乎还不可用。 我现在使用https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。

我无法检索最后提交的偏移量 我不知道如何制作一个 sarama consumer 从所述偏移量开始。目前我发现的唯一参数是Config.Producer.Offsets.Initial

  1. 如何检索最后提交的偏移量?
  2. 如何让消费者从最后一条offset已经提交的消息开始? OffsetNewest 将使它从产生的最后一条消息开始,而不是消费者最后处理的消息。
  3. 是否可以仅使用 Shopify/sarama 而不是 bsm/sarama-cluster 这样做?

提前致谢

附:我使用的是 Kafka 10.0,所以偏移量是存储在 kafka 中而不是 zookeeper 中。

EDIT1: 部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到找到未处理的消息。

【问题讨论】:

    标签: go apache-kafka sarama


    【解决方案1】:

    如果已经为分区保存了偏移量,sarama-cluster 将从该偏移量恢复消费。 Config.Producer.Offsets.Initial 选项仅在不存在保存的偏移量时使用(第一次为消费者组运行)。

    您可以通过在 main() 函数的开头添加以下行来验证这一点:

    sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)
    

    然后你会在输出中看到类似下面的内容:

    cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12

    其中的 12 是 Sarama 将从该分区 (sample/0) 开始的偏移量。

    【讨论】:

      猜你喜欢
      • 2019-04-04
      • 1970-01-01
      • 2019-04-11
      • 2019-01-18
      • 2017-07-22
      • 2017-12-13
      • 1970-01-01
      • 2016-02-14
      • 2020-05-17
      相关资源
      最近更新 更多