【发布时间】:2017-08-17 05:41:46
【问题描述】:
我正在使用 KafkaMessageListenerContainer 从 kafka 主题中消费,我有一个应用程序逻辑来处理每个记录,它也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但如果我的应用程序逻辑失败,我需要寻找失败的偏移量并继续处理它,直到它成功。为此,我需要对最后一个偏移量进行运行时手动查找。
KafkaMessageListenerContainer 可以做到吗?
【问题讨论】:
-
您在应用程序逻辑之后提交偏移量。因此,如果您在应用程序逻辑失败的情况下不提交偏移量,则偏移量将不会向前移动,您将再次处理相同的消息。这能解决您的问题吗?
-
@yaswanth 不,我猜它不像那样工作。在开始测试之前,我的假设与您相似。我启用了 ENABLE_AUTO_COMMIT_CONFIG - false 和 AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE 的容器属性与 AcknowledgeingMessageListener。我将记录 1 和记录 2 发送到主题,记录 1- 应用程序逻辑失败(我不回复),此时下一次消费者轮询应该再次获取记录 1,但我得到记录 2。如果这是配置参数修复的问题请告诉我!
-
你是对的!到目前为止,我一直处于错误的假设之中。
标签: apache-kafka kafka-consumer-api spring-kafka