【问题标题】:Kafka Compaction for topic主题的 Kafka 压缩
【发布时间】:2017-08-13 02:53:23
【问题描述】:

我使用kafka-topics.bat --zookeeper localhost:2181 --alter --topic test --config cleanup.policy=compact delete config min.cleanable.dirty.ratio=0.01 --config segment.ms=100 --config delete.retention.ms=100 来压缩我的主题。 我已经使用相同的密钥发送了 2000 条消息。当我使用这些消息时,我会分别收到每条消息,而不是一条压缩消息。

【问题讨论】:

  • 你是怎么解决这个问题的?

标签: apache-kafka


【解决方案1】:

您所指的压缩设置与您使用 Kafka 客户端使用消息的方式无关。请查看official documentation here 了解更多详情。

如果您想控制您的客户端如何使用消息,您必须使用 client config properties 配置您的客户端。

假设您将主题集中 300 毫秒并接收一组消息 (ConsumerRecords),然后您可以迭代这些消息以独立处理每条消息。

while(true) {
   ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(300);
       if(records.count() > 0) {
          for(ConsumerRecord<String, JsonNode> record: records) {
             if(counter % 500 == 0) {
                 log.info("Record recovered, groupId: {}, topicName: {}, key: {}, value: {} , offset: {}",
                 this.groupId, this.topicNames, record.key(), record.value(), record.offset());
                    }
                }
            }
        }

【讨论】:

  • 假设我们有类似这样的 key=value1 key=value2 现在 value2 是 key 的更新值,我只想要更新的值!如何配置 Kafka 以仅保留更新的值?
  • 您好,您不能只保留最新信息,这不是 kafka 的工作方式,它是一个仅附加的分布式日志,对于您提到的内容,您需要使用 kafka 流和 Ktable 或 GlobalKTable据我记得现在还不确定,是在 kafka 0.10.1 或 0.10.0 中引入的……confluent.io/blog/…kafka.apache.org/documentation/streams#streams_kstream_ktable
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-09-14
  • 1970-01-01
  • 2019-04-22
  • 2019-09-20
  • 2020-12-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多