【发布时间】:2017-05-04 14:26:25
【问题描述】:
这是我的问题,关于 Kafka:
我有两个程序与 Kafka 交换数据(一个正在生产,另一个正在读取)。让我们假设客户崩溃。生产者会不断发送消息,一段时间后我们会重启客户。
使用我们当前的代理设置,以下是应该发生的情况:
-如果崩溃发生在不到 1 天前(因为 offsets.retention.minutes 为 1440),则检索偏移量,并处理等待消息。
-如果崩溃发生在 1 天前,客户的新偏移量将被重置为最早(因为 auto.offset.reset 是最早的)。问题是:如果某些消息已经被处理(1 天到 7 天之间),它们将再次被处理,因为 Kafka 保存了 7 天的消息(log.retention.hours 为 168)。
解决方案是否像将 offsets.retention.minutes 和 log.retention.hours 设置为相同的值一样简单(当然,进行转换分钟小时) ?还是会产生一些我错过的副作用?一个更简单的解决方案是只删除已经处理过的消息,但似乎 Kafka 不能这样做。
感谢阅读。
【问题讨论】:
标签: java apache-kafka