【发布时间】:2017-08-14 11:07:45
【问题描述】:
我们的系统中实现了kafka流处理,用于事务处理。解决方案实现如下,
Kafka生产者向kafka主题发布事件,流处理器处理输入事件并执行聚合操作。流处理后,事件将发布到另一个主题。由于第一个主题中没有实现消费者,我如何从第一个主题中删除已处理的消息。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
我们的系统中实现了kafka流处理,用于事务处理。解决方案实现如下,
Kafka生产者向kafka主题发布事件,流处理器处理输入事件并执行聚合操作。流处理后,事件将发布到另一个主题。由于第一个主题中没有实现消费者,我如何从第一个主题中删除已处理的消息。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
没有办法手动从 kafka 中删除消息(没有黑客攻击磁盘上的数据,AFAIK)。您只有 3 个选项:
使用基于时间的保留策略(例如让 kafka 自动删除所有超过 1 小时的消息)
使用基于存储的保留策略(让 kafka 将主题大小保持为某个预定义值)
使用主题压缩策略 - 让 kafka 保留最新版本的密钥。所有旧版本的密钥都将被删除(压缩)。
正如 Luciano Afranllie 所述 - 您无需手动删除消息。您可以处理消息并让 kafka 根据您的策略管理主题。
【讨论】:
考虑到您的流处理链是第一个主题的消费者。如果您出于某种原因需要重新处理原始数据(例如,如果您意识到流处理逻辑中存在错误),您可能希望在处理完原始消息后仍可在第一个主题上使用它们。
因此,您无需删除邮件,您必须针对该主题设置适合您需求的保留政策。权衡通常是数据可用的时间量与所需的存储量。
【讨论】:
有一个 Kafka 改进提案 (KIP) 可以为这个用例添加这个功能。
目前所有用于删除消息的 Scala 代码都在 0.11 Kafka 中,并且已经过测试可以正常工作
https://github.com/apache/kafka/pull/2476
但是,在 Java AdminClient API 和文档中添加此功能尚未完成。
【讨论】: