【问题标题】:read __consumer_offsets with kafka go用 kafka go 读取 __consumer_offsets
【发布时间】:2019-10-25 23:57:56
【问题描述】:

我想使用这个库阅读主题__consumer_offsets:https://github.com/segmentio/kafka-go

我的问题是,除非我指定一个分区,否则似乎什么都不会发生。默认情况下这个主题有100个分区,查询kafka的分区列表然后循环读取它们似乎是不合理的,我希望库中有一个预先存在的方法来读取来自所有分区的消息在主题中。

目前以下工作,在我用kafkacat验证__consumer_offsets主题的分区15中有消息后:

  r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"kafka:9092"},
    Topic:     "__consumer_offsets",
    Partition: 15
  })
  r.SetOffset(0)

  for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
      log.Println("Error while trying to read message")
      log.Fatal(err)
      break
    }
    log.Printf("message at offset %d\n", m.Offset)
  }

  r.Close()

我认为分区选择应该在用户级别上是透明的,除非需要。我错了吗?

有没有办法从主题中读取消息,而不管消息在哪个分区中?或者换个说法,从所有分区中读取?

【问题讨论】:

    标签: go apache-kafka


    【解决方案1】:

    使用消费者组API,不需要给分区。

    https://github.com/segmentio/kafka-go#consumer-groups

    // GroupID holds the optional consumer group id.  If GroupID is specified, then
    // Partition should NOT be specified e.g. 0
    GroupID string
    
    
    // Partition to read messages from.  Either Partition or GroupID may
    // be assigned, but not both
    Partition int
    

    https://godoc.org/github.com/segmentio/kafka-go#ReaderConfig

    【讨论】:

    • 我不明白你的回答。我不想使用来自具有组 ID 的主题的消息,我想从 __consumer_offsets 读取所有组的偏移量。您向我指出的文档部分明确表示在使用组 ID 时它将返回 -1。另外,提交抵消会破坏整个目的
    • 您的问题是询问如何不提供分区号。如果您阅读该库的配置文档,您可以提供单个分区或消费者 groupId,以便使用所有分区。该属性不控制您在主题本身中阅读的内容。
    • 我有点迷茫,可能是我还没有100%理解kafka。除非我使用组 ID,否则实现不允许我阅读整个主题的机制的逻辑是什么?使用 kafkacat 时我可以在没有人的情况下做到这一点,那为什么不进去呢?就我而言,我想阅读__consumer_offsets 主题,因为我想对它们做一些事情(而不改变它们),所以使用一个组并提交偏移量完全违背了目的。以这种方式实施是否有原因?
    • 另一件事,我刚刚尝试了你的建议,它创建了一个无限循环读取__consumer_offsets,自动提交一个新的偏移量,并读取刚刚提交的偏移量
    • 该机制称为“消费者群体”。 Kafkacat(和控制台消费者)为您随机生成一个。如果您想在没有无限循环的情况下阅读该主题,则必须关闭自动提交。不过,我不知道非 Java 属性
    猜你喜欢
    • 2016-02-28
    • 2019-01-04
    • 2019-07-07
    • 2019-05-14
    • 1970-01-01
    • 1970-01-01
    • 2017-01-24
    相关资源
    最近更新 更多