【问题标题】:How do I know which partition to consume in Kafka?我如何知道在 Kafka 中使用哪个分区?
【发布时间】:2019-08-23 03:20:25
【问题描述】:

我在 Kafka 代理中有一个包含 3 个分区的主题 1个分区有消息,2个分区是空的,我怎么知道一次调用消耗哪个分区?

首先我分配一个分区等于kafka.PartitionAny的TopicPartition,但是这个值一直返回-1

所以我必须手动使用计数器,当我从一个分区成功消费但消息为空时,然后count++ 并从下一个开始,直到找到消息


for{

    partitions = append(partitions, kafka.TopicPartition{
        Topic:     &topic,
        Partition: partition,
        Offset:    offSet,
        Error:     err,
    })


    err = c.Assign(partitions)
    if err != nil {
         return err
    }

    // retrieve message
    ev, err := c.Poll(-1)
    if err != nil {
         return err
    }

    // if no message, check the next partition
    if ev == nil{
         partition++
    }else{
         break
    }

}

前两轮不返回任何消息,但要等待第三轮返回,有什么办法可以自动检测到哪个分区存储了未消费的消息?

如果没有其他办法,Kafka可以为我做轮询路由吗?或者我必须自己记录计数器

谢谢! :)

【问题讨论】:

  • 如果您只想消费所有消息而不知道消息将存储在哪个分区,则可以使用Subscribe() 而不是Assign()
  • @max23_感谢您的回复!但是如果我使用 Subscribe(),我该如何手动提交偏移量?
  • 需要自己手动处理偏移量吗?有一个名为 enable.auto.commit 的消费者配置默认设置为 true,它将定期在后台提交偏移量。
  • @max23_ 是的,我需要自己处理偏移量:( 订阅方法似乎无法指定要消耗的偏移量
  • 哦,我看到你使用 subscribe 方法时有一个 rebalance 回调。它将返回您的 AssignedPartitions 或 RevokedPartitions 具有TopicPartition 列表的事件。可能可以像处理 AssignedPartitions 事件一样尝试一下。

标签: go apache-kafka kafka-consumer-api messagebroker


【解决方案1】:

您绝对应该使用 subscribe() 方法,然后调用 poll()。 如果有可用记录,您将获得包含一条或多条记录(与不同分区关联)的 Records 答案。

处理记录后,您可以使用记录中包含的数据(主题、分区、偏移量等)手动提交(如果您使用enable.auto.commit = false)。

亚尼克

【讨论】:

  • 但是我仍然可以指定使用subscribe 时要消耗的偏移量,对吧?谢谢!
  • 是的,你仍然可以使用 kafkaConsumer.seek() 来指定你瞄准的偏移量
猜你喜欢
  • 1970-01-01
  • 2021-07-14
  • 1970-01-01
  • 2020-12-25
  • 1970-01-01
  • 1970-01-01
  • 2012-01-02
  • 2015-08-23
相关资源
最近更新 更多