【问题标题】:Kafka message codec - compress and decompressKafka 消息编解码器 - 压缩和解压
【发布时间】:2016-05-25 11:09:55
【问题描述】:

使用kafka时,我可以通过设置我的kafka生产者的kafka.compression.codec属性来设置编解码器。

假设我在生产者中使用 snappy 压缩,当使用一些 kafka 消费者消费来自 kafka 的消息时,我应该做些什么来解码来自 snappy 的数据还是它是 kafka 消费者的一些内置功能?

relevant documentation 中,我找不到任何与 kafka 消费者中的编码相关的属性(它只与生产者有关)。

有人可以解决这个问题吗?

【问题讨论】:

    标签: compression apache-kafka


    【解决方案1】:

    根据我的理解,解压缩由消费者自己负责。正如他们的官方wiki页面中提到的 The consumer iterator transparently decompresses compressed data and only returns an uncompressed message

    this文章中发现消费者的工作方式如下

    消费者拥有后台“提取器”线程,这些线程连续从代理中批量提取 1MB 的数据,并将其添加到内部阻塞队列中。消费者线程从这个阻塞队列中取出数据,解压缩并遍历消息

    并且在End-to-end Batch Compression下的文档页面中也写到了

    可以将一批消息聚集在一起压缩并以这种形式发送到服务器。这批消息会以压缩的形式写入,并在日志中保持压缩状态,只会被消费者解压。

    看来解压缩部分是在消费者自己处理的,您需要做的就是在创建生产者时使用compression.codec ProducerConfig 属性提供有效/支持的压缩类型。我找不到任何例子或解释,它说明了消费者端的任何减压方法。如果我错了,请纠正我。

    【讨论】:

      【解决方案2】:

      我对 v0.8.1 有同样的问题,除了说消费者应该“透明地”解压缩它从未做过的压缩数据之外,Kafka 中的这种压缩解压缩记录很少。

      在 Kafka 网站中使用 ConsumerIterator 的示例高级消费者客户端仅适用于未压缩的数据。一旦我在 Producer 客户端中启用压缩,消息就永远不会进入以下“while”循环。希望他们应该尽快解决这个问题,或者他们不应该声称这个功能,因为一些用户可能使用 Kafka 来传输需要批处理和压缩功能的大尺寸消息。

      ConsumerIterator <byte[], byte[]> it = stream.iterator();
      while(it.hasNext())
      {
         String message = new String(it.next().message());
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-10-17
        • 1970-01-01
        • 2023-03-09
        • 2021-11-30
        • 1970-01-01
        • 2011-06-23
        • 2019-01-29
        相关资源
        最近更新 更多