【问题标题】:How to consume from latest offset using Sarama Go Kafka Consumer如何使用 Sarama Go Kafka Consumer 从最新的偏移量中消费
【发布时间】:2019-07-16 13:33:48
【问题描述】:

我有三个问题:

  1. “最旧的偏移量”是什么意思?最旧的偏移量并不意味着偏移量 0?

// OffsetOldest 代表代理上可用的最旧偏移量
// 分区。
OffsetOldest int64 = -2

  1. 假设

    A.三个代理在同一台机器上运行
    B. 消费组只有一个消费线程
    C. 消费者配置 OffsetOldest 标志。
    D. 已经产生了 100 条消息,目前消费者线程已经消耗了 90 条消息。

    那么如果消费者线程重新启动,那么这个消费者将从哪个偏移量开始消费呢?是 91 还是 0?

  2. 在我们下面的代码中,似乎每次启动消费者时都会重新使用消息。但实际上它并不总是发生。为什么重用只是在重启后发生几次(不是全部)?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    

【问题讨论】:

    标签: go apache-kafka kafka-consumer-api sarama


    【解决方案1】:
    1. 没有。应用保留策略时,会从主题中删除较旧的消息。因此,最早的偏移量可能不是第一个偏移量(即0)。

    2. 这取决于您的配置。本质上,您有 3 个选项:

      • earliest 偏移量开始消费
      • latest 偏移量开始消费
      • 从特定偏移量开始消费
    3. 你必须使用sarama.OffsetOldest。来自documentation

     const (
            // OffsetNewest stands for the log head offset, i.e. the offset that will be
            // assigned to the next message that will be produced to the partition. You
            // can send this to a client's GetOffset method to get this offset, or when
            // calling ConsumePartition to start consuming new messages.
            OffsetNewest int64 = -1
            // OffsetOldest stands for the oldest offset available on the broker for a
            // partition. You can send this to a client's GetOffset method to get this
            // offset, or when calling ConsumePartition to start consuming from the
            // oldest offset that is still available on the broker.
            OffsetOldest int64 = -2
        )
    

    【讨论】:

    • 我找到了答案。对于问题#2,除非您的消费偏移已过期,否则您不会从早期/最新偏移中消费!!!
    猜你喜欢
    • 1970-01-01
    • 2018-06-28
    • 1970-01-01
    • 1970-01-01
    • 2019-06-10
    • 2021-08-09
    • 2020-05-07
    • 2018-07-01
    • 1970-01-01
    相关资源
    最近更新 更多