【问题标题】:Kafka message size with activated compression激活压缩的 Kafka 消息大小
【发布时间】:2021-02-18 06:34:38
【问题描述】:

我对 Kafka 2.6.0 中的消息大小配置有点困惑。但是让我们讲故事吧:

我们正在使用由 3 个节点组成的 Kafka 集群。到目前为止,消息的标准配置。 “zstd 压缩”已激活。

相关的broker配置很简单:

compression.type=zstd

此时生产者配置也很简单:

compression.type=zstd

现在我们想将 8 MB 的消息放入特定主题。此数据压缩后的大小仅为 200 KB。

如果我将此数据放入主题中,则会发生以下错误:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

所以我像这样更改了生产者配置:

compression.type=zstd
max.request.size=10485760

现在生产者接受更大的消息。但是还是不行:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

这是另一个错误消息。我不明白为什么会这样。

我认为此消息与“message.max.bytes”属性有关。但我不明白为什么。这是此属性的文档:

Kafka 允许的最大记录批量大小(如果启用压缩,则在压缩之后)。如果增加了这个值并且有超过 0.10.2 的消费者,那么消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。在最新的消息格式版本中,为了提高效率,记录总是被分组为批次。在以前的消息格式版本中,未压缩的记录不会分组到批次中,并且此限制仅适用于在这种情况下的单个记录。这可以使用主题级别 max.message.bytes 配置为每个主题设置。

我认为这意味着该参数与压缩后的消息大小有关,大约为千字节。

有人可以帮我吗?

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    我找到了解决办法:

    问题是 kafka-console-producer.sh 忽略了生产者配置中的 compression.type。如果我明确调用

    sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt
    

    使用 compression.codec=zstd 它可以工作,因为生产者压缩了消息。

    【讨论】:

      【解决方案2】:

      我们的经验是,如果您像之前那样在代理级别设置压缩类型

      compression.type=zstd
      

      代理将解压缩来自生产者的任何内容,并使用该压缩类型再次压缩数据。即使生产者已经使用了 zstd,也会进行解压和“重新压缩”。

      因此,您需要将代理级别的compression.type 设置为producer

      【讨论】:

      • compression.type=zstd 此配置位于代理站点和生产者站点。它不在主题网站上。
      • 对不起,造成混乱。答案保持不变。
      • 我已经把broker的配置改成compression.type=producer 错误依旧。
      • 您是否也重新启动了代理?
      • 而且这个话题也是broker重启后创建的?我猜topic的compression.type也需要设置为“producer”。
      猜你喜欢
      • 2018-10-19
      • 1970-01-01
      • 2019-09-20
      • 1970-01-01
      • 2015-03-06
      • 2017-08-11
      • 1970-01-01
      • 1970-01-01
      • 2019-04-10
      相关资源
      最近更新 更多