【问题标题】:How to produce messages to selected partition using kafka-console-producer?如何使用 kafka-console-producer 向选定分区生成消息?
【发布时间】:2014-12-20 14:46:49
【问题描述】:

根据 Kafka 文档:

生产者负责选择将哪条消息分配给主题内的哪个分区。

如何使用kafka-console-producer.sh 向选定的分区发送消息?

我想在发送消息时指定某种“分区 ID”。

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    这是您的起点:
    partitioner.class 在您的 Properties 实例中设置。在 Kafka 中,默认实现是kafka.producer.DefaultPartitioner

    该设置的目标是:

    用于在子主题之间划分消息的分区器类。默认分区器基于键的哈希。

    这意味着如果你想改变默认分区器的行为,那么你需要创建自己的kafka.producer.Partitioner接口实现。

    我建议在创建自己的策略时要非常小心,并且确实要对其进行大量测试并监控您的主题及其分区。

    【讨论】:

      【解决方案2】:

      更新:这个答案在 2014 年是正确的,但最新版本的 Kafka 可以通过控制台生产者生成键/值对。见belowanswers

      kafka-console-producer.sh 不支持开箱即用地向特定分区生成消息。

      但是,更新脚本以传递一个额外的分区 ID 参数,然后在自定义分区器中处理它应该非常简单,如 @Chiron 在 kafka.tools.ConsoleProducer 类的修改版本中的帖子中所述。

      查看源代码:

      https://apache.googlesource.com/kafka/+/refs/heads/trunk/bin/kafka-console-producer.sh https://apache.googlesource.com/kafka/+/refs/heads/trunk/core/src/main/scala/kafka/tools/ConsoleProducer.scala

      【讨论】:

      • 这似乎不再正确,请参阅我的评论。
      【解决方案3】:

      到目前为止,ConsoleProducer 似乎支持将键控消息写入主题。 Kafka 将使用密钥的哈希将消息分配到分区中,至少使用默认行为。

      目前,默认分隔符为\t,因此输入key[\t]message 会将其分布到分区中:

      key1    a-message
      

      可以通过提供key.separator配置来更改分隔符,例如:

      kafka-console-producer --broker-list localhost:9092,localhost:9093 \
        --topic mytopic --property key.separator=,
      

      这样发送消息:

      key2,another-message
      

      我已经成功地使用默认选项卡和自定义分隔符对此进行了测试。消息被分发到两个单独的分区。

      【讨论】:

      • 它也对我有用。值也可以包含分隔符。它只是寻找第一个分隔符位置并将记录分成两个标记
      • 如何根据分区获取console消费者中的消息?
      • 考虑使用kafkacat
      • kafka-console-consumer 接受 print.key 属性。您还可以自定义字符串以分隔输出中的键和值:key.separator。 --property print.key=true --property key.separator="-"
      【解决方案4】:

      根据目前的情况(Kafka>=0.10.0.1),kafka-console-producer.sh脚本和底层的ConsoleProducer java类支持带key发送数据,但是这种支持默认是禁用的,有从 CLI 启用。

      即,您需要设置属性parse.key。此外,如果您想使用不同于制表符的字符,请使用 Cedric 的回答中指定的 key.separator

      最后,命令行是:

      kafka-console.producer.sh --broker-list kafka:9092,kafka2:9092 \
          --topic $TOPIC --property parse.key=true --property key.separator=|
      

      【讨论】:

        【解决方案5】:
        C:\arunsingh\demo\kafka_2.13-2.4.0\bin\windows>kafka-console-producer.bat --broker-list 127.0.0.1:9094 --topic arun_topic --property parse.key=true --property key.separator=, --producer-property acks=all
        >myKey1, Message with key
        >myKey2, Message with key 2
        >
        

        【讨论】:

        • 但是我们如何在存储的消息中也包含密钥(最坏的情况我可以复制它)
        猜你喜欢
        • 1970-01-01
        • 2018-03-23
        • 2021-11-25
        • 2016-06-13
        • 2021-11-09
        • 2020-01-27
        • 2018-02-09
        • 2020-08-21
        • 1970-01-01
        相关资源
        最近更新 更多