【问题标题】:How to Gracefully Turn Off Running Kafka Consumer如何优雅地关闭正在运行的 Kafka Consumer
【发布时间】:2019-02-27 22:40:24
【问题描述】:

我需要根据一些数据库驱动的属性打开/关闭 Kafka 消费者。怎么可能实现。

我想到的一种方法是:当消费者标志关闭时,从消费者那里抛出异常。容器工厂配置定义为

factory.setErrorHandler(new SeekToCurrentErrorHandler());

但它会主动寻找相同的信息。

有什么方法可以关闭心跳,然后根据需要重新打开。

【问题讨论】:

    标签: apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    您可以stop()start() 监听器容器。

    您使用的是@KafkaListener,因为您使用的是容器工厂。

    这样的话

    @KafkaListener(id = "foo" ...)
    

    然后使用KafkaListenerEndpointRegistry bean ...

    registry.getListenerContainer("foo").stop();
    

    【讨论】:

    • 在我的情况下,我必须按照 Gary 的建议自动装配这个 bean 以使用注册表 @Autowired @Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    • @GaryRussell:你能解释一下取消订阅和关闭之间的区别吗?消费者再平衡是否发生在两者中?如果是的话,它是否会对使用 seek 方法()手动寻找特定偏移量产生性能影响?
    • 您不应该在 cmets 中提出新的、不相关的问题;它不能帮助人们找到问题和答案。总是问一个新问题。 unsubscribe()close() 都会导致重新平衡。不同之处在于您可以根据需要多次订阅/取消订阅,而不会完全摧毁消费者。我在猜测,但我希望寻求比通过取消订阅和重新订阅来强制进行 2 次重新平衡更有效。
    • @GaryRussell 再次感谢,下次我会确保发布新问题。只是在想它可能对其他人没有那么大的价值。
    【解决方案2】:

    Consumer 有几个 API 来控制其状态:

    • pause()/resume():允许停止/恢复从一组分区消费。消费者保持订阅状态(因此没有重新平衡),但在恢复之前不会获取任何新记录

    • unsubscribe():允许更改消费者订阅,如果没有订阅任何内容,它将保持与集群的连接。

    如果您“完成”了消费者,您还可以致电close() 并在需要时开始一个新的

    【讨论】:

    • 您能否指出一些有关 pause() 和 resume() 内部工作的资源。例如,在 pause() 中请求从其组中离开消费者,然后在 resume() 中请求加入消费者组。
    • 不,pause() 只是停止对给定分区的消费,但消费者留在组中(没有重新平衡)。消费者只有在close() 或当他们没有心跳时才离开他们的群组
    • @MickaelMaison :你能解释一下取消订阅和关闭之间的区别吗?消费者再平衡是否发生在两者中?如果是的话,它是否会对使用 seek 方法()手动寻找特定偏移量产生性能影响?
    • close() 完全断开客户端,而unsubscribe() 保持客户端连接。如果客户端保持关闭或取消订阅,将触发重新平衡以将其分区重新分配给其他成员。但是有一些细微差别在评论中有点难以解释,特别是自从 Kafka 2.3 具有静态成员资格 (kafka.apache.org/documentation/#static_membership)。
    猜你喜欢
    • 2017-10-25
    • 2017-06-19
    • 2018-11-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-01
    • 2014-11-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多