【问题标题】:how to view kafka headers如何查看kafka标头
【发布时间】:2019-08-06 09:59:30
【问题描述】:

我们使用标头向 Kafka 发送消息 org.apache.kafka.clients.producer.ProducerRecord

public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
    this(topic, partition, (Long)null, key, value, headers);
}

我如何才能使用命令实际查看这些标题。 kafka-console-consumer.sh 仅向我显示有效负载而没有标头。

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    从 kafka-2.7.0 开始,您可以通过提供属性 print.headers=true 在控制台消费者中启用打印标题

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic quickstart-events --property print.key=true --property print.headers=true --property print.timestamp=true

    【讨论】:

      【解决方案2】:

      您也可以为此使用kafkactl。例如。输出为 yaml:

      kafkactl consume my-topic --print-headers -o yaml
      

      样本输出:

      partition: 1
      offset: 22
      headers:
        key1: value1
        key2: value2
      value: my-value
      

      免责声明:我是这个项目的贡献者

      【讨论】:

      • kafkactl 轻松安装在 Centos 7 上,简单易用,谢谢
      【解决方案3】:

      您可以使用出色的kafkacat 工具。

      示例命令:

      kafkacat -b kafka-broker:9092 -t my_topic_name -C \
        -f '\nKey (%K bytes): %k
        Value (%S bytes): %s
        Timestamp: %T
        Partition: %p
        Offset: %o
        Headers: %h\n'
      

      示例输出:

      Key (-1 bytes):
        Value (13 bytes): {foo:"bar 5"}
        Timestamp: 1548350164096
        Partition: 0
        Offset: 34
        Headers: __connect.errors.topic=test_topic_json,__connect.errors.partition=0,__connect.errors.offset=94,__connect.errors.connector.name=file_sink_03,__connect.errors.task.id=0,__connect.errors.stage=VALU
      E_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Co
      nverting byte[] to Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed
       due to serialization error:
      

      kafkacat 标头选项仅在 kafkacat 的最新版本中可用;如果您当前的版本不包含它,您可能想自己从 master 分支 build


      你也可以从 Docker 运行 kafkacat:

      docker run --rm edenhill/kafkacat:1.5.0 \
            -b kafka-broker:9092 \
            -t my_topic_name -C \
            -f '\nKey (%K bytes): %k
        Value (%S bytes): %s
        Timestamp: %T
        Partition: %p
        Offset: %o
        Headers: %h\n'
      

      如果您使用 Docker,请记住如何访问 Kafka 代理的网络影响。

      【讨论】:

      • 非常感谢。
      • 它是否适用于 kafka 2.3.0?我的 kafkacat 是 1.3.1,它返回 % ERROR: Unsupported formatter: %h
      • 在 AK 2.3 上运行良好。我正在使用 kafkacat Version 1.3.1-52-gc7986d (JSON) (librdkafka 1.0.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)。您是否尝试过从 master 构建?
      • 除了是一个很棒的工具(感谢@RobinMoffatt!)之外,它的另一个好处是它可以通过 apt-get 和 brew 安装。
      【解决方案4】:

      来自kafka-console-consumer.sh 脚本:

      exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
      

      源代码:https://github.com/apache/kafka/blob/2.1.1/bin/kafka-console-consumer.sh

      kafka.tools.ConsoleConsumer 中,标头提供给格式化程序,但现有的格式化程序都没有使用它:

      formatter.writeTo(new ConsumerRecord(msg.topic, msg.partition, msg.offset, msg.timestamp,
                                           msg.timestampType, 0, 0, 0, msg.key, msg.value, msg.headers),
                                           output)
      

      src:https://github.com/apache/kafka/blob/2.1.1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala

      在上述链接的底部,您可以看到现有的格式化程序。

      如果你想打印标题,你需要实现自己的kafka.common.MessageFormatter,尤其是它的 write 方法:

      def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
      

      然后使用 --formatter 运行您的控制台使用者,提供您自己的格式化程序(它也应该存在于类路径中)。

      另一种更简单、更快捷的方法是使用 KafkaConsumer 实现您自己的小程序并在调试中检查标头。

      【讨论】:

        猜你喜欢
        • 2020-07-30
        • 2017-10-29
        • 2011-05-24
        • 2021-09-22
        • 2021-11-15
        • 1970-01-01
        • 1970-01-01
        • 2020-09-25
        相关资源
        最近更新 更多