【发布时间】:2020-05-24 05:36:06
【问题描述】:
在我的 Kafka 流应用程序中,我有一个处理器,它计划每 60 秒产生一次输出消息。输出消息由来自单个输入主题的消息构建。有时会发生输出消息大于代理的配置限制(默认为 1MB)。抛出异常并关闭应用程序。提交间隔设置为默认值(60 秒)。
在这种情况下,我希望在下一次运行中,在崩溃前 60 秒内消耗的所有消息都会被重新消耗。但实际上这些消息的偏移量已提交,并且在下次运行时不会再次处理这些消息。
阅读similar questions 的答案,在我看来,不应提交偏移量。当我将提交间隔增加到 120 秒(处理器仍然每 60 秒打断一次)时,它会按预期工作并且不会提交偏移量。
我正在使用默认处理保证,但我也尝试过exactly_once。两者的结果相同。从处理器调用context.commit() 似乎对此问题没有影响。
我在这里做错了吗?
【问题讨论】:
标签: apache-kafka apache-kafka-streams