【发布时间】:2018-05-20 20:22:54
【问题描述】:
我正在使用 spring kafka 1.2.2.RELEASE。目前我已经为没有 BackOffPolicy 和 AlwaysRetryPolicy 的容器配置了重试模板。确认模式是 MANUAL_IMMEDIATE。
当一个 SIGTERM 时,我会让当前消息被处理,当 @KafkaListener 再次被新消息调用时,我在容器上抛出 RuntimeException ,该容器无限期地重试并连续抛出异常。一段时间后发出 SIGKILL 并停止容器(我认为有更好的方法)。但是在这个过程中,重试的消息在消费者重新启动后被检索,但在没有调用 KafkaListener 的情况下被提交。请参阅下面的堆栈跟踪中的 offset=13
堆栈跟踪:
[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}
当我看到发出 SIGTERM 时,是否有更好的方法来停止容器,因此不会使用消息调用 @KafkaListener。我知道 spring-kafka 的更高版本(> 2.0.0)具有可以阻止所有消费者的 KafkaListenerEndpointRegistry。但目前无法升级到 2.0.0。
非常感谢任何帮助。
【问题讨论】:
标签: spring-kafka