【问题标题】:spring kafka 1.2.2 gracefull shutdownspring kafka 1.2.2 优雅关机
【发布时间】:2018-05-20 20:22:54
【问题描述】:

我正在使用 spring kafka 1.2.2.RELEASE。目前我已经为没有 BackOffPolicy 和 AlwaysRetryPolicy 的容器配置了重试模板。确认模式是 MANUAL_IMMEDIATE。

当一个 SIGTERM 时,我会让当前消息被处理,当 @KafkaListener 再次被新消息调用时,我在容器上抛出 RuntimeException ,该容器无限期地重试并连续抛出异常。一段时间后发出 SIGKILL 并停止容器(我认为有更好的方法)。但是在这个过程中,重试的消息在消费者重新启动后被检索,但在没有调用 KafkaListener 的情况下被提交。请参阅下面的堆栈跟踪中的 offset=13

堆栈跟踪:

[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator  onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}

当我看到发出 SIGTERM 时,是否有更好的方法来停止容器,因此不会使用消息调用 @KafkaListener。我知道 spring-kafka 的更高版本(> 2.0.0)具有可以阻止所有消费者的 KafkaListenerEndpointRegistry。但目前无法升级到 2.0.0。

非常感谢任何帮助。

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    端点注册表自 1.0 以来一直存在。 1.x用户建议升级到最新的1.3.x;见here

    从侦听器停止容器时,最好在新线程上执行此操作,否则停止会延迟。

    请参阅 2.1.x ContainerStoppingErrorHandler 了解如何执行此操作。但是,当然,您不需要在停止后抛出异常。

    但在 1.x 中,您将需要丢弃所有已获取的后续消息。

    【讨论】:

    • 谢谢加里。我刚刚发现注册表在 1.3.5 中可用。例如,我的侦听器很慢,仍在处理消息。我发出一个 sigterm 来停止所有容器(在另一个使用注册表的线程中)。当我重新启动时,我没有收到相同的消息。你介意分享你如何克服这个问题吗(我试图不承认消息,但仍然是一样的)。如何丢弃KafkaListener中的消息
    • 我发现了问题。我正在使用recordFilterStrategy,在开始处理消息之前,我将此记录添加到元数据存储中,这导致了这种异常行为。应该早点检查。抱歉,谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-12
    • 2019-03-10
    • 2017-06-19
    • 2019-09-26
    • 1970-01-01
    • 2020-08-11
    相关资源
    最近更新 更多