【问题标题】:Kafka how to read from __consumer_offsets topicKafka 如何从 __consumer_offsets 主题中读取
【发布时间】:2016-02-28 19:05:24
【问题描述】:

我正在尝试找出我当前的高级消费者正在解决哪些补偿问题。我使用 Kafka 0.8.2.1,在 Kafka 的 server.properties 中设置了 no "offset.storage" - 我认为这意味着偏移量存储在 Kafka 中。 (我还通过在 Zk shell 中检查此路径来验证 Zookeeper 中没有存储偏移量:/consumers/consumer_group_name/offsets/topic_name/partition_number

我试着听__consumer_offsets这个话题,看看哪个消费者保存了什么偏移值,但是没有用...

我尝试了以下方法:

为控制台使用者创建了一个配置文件,如下所示:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false

并尝试了两个版本的控制台消费者脚本:

#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config

两者都不起作用 - 它只是坐在那里但不打印任何东西,即使消费者正在积极消费/保存偏移量。

我是否缺少其他一些配置/属性?

谢谢!

码头

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    如果你添加--from-beginning 应该很可能会给你一些结果,至少在我自己尝试的时候是这样。或者,如果您没有提供该参数,而是在您让该消费者收听时阅读了更多消息(并触发偏移提交),那么也应该在那里显示消息。

    【讨论】:

    • 谢谢,纪尧姆。不幸的是,添加 --from-beginning 并没有帮助 - 同样的行为....消费者只是坐在那里没有数据,当我 Cntr-C 它时 - 它说“0条消息”消耗:(
    【解决方案2】:

    好的,我已经找到问题所在了。我的 Kafka 实际上是使用 Zookeeper 作为偏移存储,而不是 Kafka ...。我没有立即检测到的原因是因为我错误地检查了 ZK 内容:

    我在做

    ls  /consumers/consumer_group_name/offsets/topic_name/partition_number
    

    那里什么也看不见。相反,我必须“获取”内容——这确实为我的消费者显示了正确的偏移量,如下所示:

    get /consumers/consumer_group_name/offsets/topic_name/partition_number 
    185530404
    cZxid = 0x70789ad05
    ctime = Mon Nov 23 17:49:46 GMT 2015
    mZxid = 0x7216cdc5c
    mtime = Thu Dec 03 20:18:57 GMT 2015
    pZxid = 0x70789ad05
    cversion = 0
    dataVersion = 3537384
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 9
    numChildren = 0
    

    【讨论】:

      【解决方案3】:

      我在尝试使用 __consumer_offsets 主题时遇到了这个问题。 我设法弄清楚了不同的 Kafka 版本,并认为我会分享我的发现

      对于 Kafka 0.8.2.x

      注意:这里使用 Zookeeper 连接

      #Create consumer config
      echo "exclude.internal.topics=false" > /tmp/consumer.config
      #Consume all offsets
      ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
      --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
      --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
      

      对于 Kafka 0.9.x.x 和 0.10.x.x

      #Create consumer config
      echo "exclude.internal.topics=false" > /tmp/consumer.config
      #Consume all offsets
      ./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
      --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
      --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
      

      对于 0.11.x.x - 2.x

      #Create consumer config
      echo "exclude.internal.topics=false" > /tmp/consumer.config
      #Consume all offsets
      ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
      --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
      --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
      

      【讨论】:

      • 截至今日,格式化程序已更新为kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter
      • 对于较新的版本(我假设为 .10 或更高版本)考虑使用 --bootstrap-server:9092 而不是 --zookeeper:2181
      • 为什么需要exclude.internal.topics=false?它可以在没有该选项的情况下使用。
      • @KenjiNoguchi 因为它可以打印所有主题的所有组
      【解决方案4】:

      从 Kafka 0.11 开始,the (Scala) source code can be found here

      对于那些需要 Java 翻译的人,从任何 Consumer 进程,假设你得到一个ConsumerRecord<byte[], byte[]> consumerRecord,你可以使用

      1. 获取密钥,(首先检查密钥是否为空)并使用GroupMetadataManager.readMessageKey(consumerRecord.key)。这可以返回不同的类型,所以检查if ( ... instanceof OffsetKey),然后转换它,你可以从中得到各种值。

      2. 要获取offsets的Kafka记录值,可以使用String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))

      从 Scala 代码翻译的最小 Java 示例...

      byte[] key = consumerRecord.key;
      if (key != null) {
          Object o = GroupMetadataManager.readMessageKey(key);
          if (o != null && o instanceOf OffsetKey) {
              OffsetKey offsetKey = (OffsetKey) o;
              Object groupTopicPartition = offsetKey.key;
              byte[] value = consumerRecord.value;
              String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
             // TODO: Print, store, or compute results with the new key and value 
          }
      }
      

      注意,也可以使用 AdminClient API 来描述组,而不是使用这些原始消息


      Scala 源代码摘录

      def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
        Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
          // Only print if the message is an offset record.
          // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
          case offsetKey: OffsetKey =>
            val groupTopicPartition = offsetKey.key
            val value = consumerRecord.value
            val formattedValue =
              if (value == null) "NULL"
              else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
            output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
            output.write("::".getBytes(StandardCharsets.UTF_8))
            output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
            output.write("\n".getBytes(StandardCharsets.UTF_8))
          case _ => // no-op
        }
      

      【讨论】:

        【解决方案5】:

        对于 Kafka-2.X,使用以下命令

        kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

        【讨论】:

          猜你喜欢
          • 2019-07-07
          • 1970-01-01
          • 2019-10-25
          • 2019-05-14
          • 2020-07-07
          • 2016-07-01
          • 2017-01-24
          相关资源
          最近更新 更多