【问题标题】:Expiring the messages in Kafka Topic使 Kafka 主题中的消息过期
【发布时间】:2020-05-26 05:22:05
【问题描述】:

我们在开发环境中使用 Apache Kafka 执行负载测试。
在我们安装了 confluent kafka 的 Linux 机器中,空间有限,因此为了执行负载测试,我们在主题中添加了retention.ms 属性。

想法是在消息被消费者消费后从主题中删除。

我试过了

kafka-topics --zookeeper localhost:2181 --alter --topic myTopic --config retention.ms=10000

它不起作用,因此我们重新创建了主题并尝试了以下选项。

kafka-configs --alter --zookeeper localhost:2181 --entity-type topics --entity-name myTopic -add-config retention.ms=10000 

在运行该进程几个小时后,代理由于空间限制而关闭。

我可以从主题以及代理的角度尝试哪些其他选项来可靠地使消息过期并收回磁盘空间以进行长时间运行的负载测试。

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    除时间外,您还可以根据字节大小定义删除策略。

    主题配置称为retention.bytes,在文档中描述为:

    如果我们使用“删除”保留策略,此配置控制分区(由日志段组成)在我们丢弃旧日志段以释放空间之前可以增长到的最大大小。默认情况下没有大小限制,只有时间限制。由于此限制是在分区级别强制执行的,因此将其乘以分区数以计算主题保留(以字节为单位)。

    您可以将它与retention.ms 一起设置,无论首先达到什么限制(字节或时间),都会触发清理。

    【讨论】:

      【解决方案2】:

      这可能是因为您的日志清理线程可能尚未触发。

      您没有提供太多关于这些主题累积了多少数据的信息。但它可能不在 GB 中。

      日志清理线程将清理已完成的日志段。段的默认大小为 1 GB。

      如果您预计负载很大,请将您的主题配置 segment.bytes 修改为较小的值。

      或 根据您的要求将配置 segment.ms 修改为 1 分钟或 10 分钟。

      这应该会创建段,并根据您的日志保留时间,清理线程将清理较旧的已完成段。

      【讨论】:

      • 在 4 小时的运行中,我们将在主题中积累 200 GB 或更多价值的消息..
      • 检查 kafka 日志中的段以及是否触发了日志清理线程。此外,您可以检查与日志清理线程 (log.cleaner.*) 相关的其他配置。日志和这些配置应该可以帮助我们调试为什么没有发生保留。