【问题标题】:Fail Fast When Kafka Consumer Can't ConnectKafka 消费者无法连接时快速失败
【发布时间】:2021-05-15 05:27:03
【问题描述】:

出于我自己的目的,如果 Kafka Consumer 无法连接到代理,我需要停止 Spring Boot 应用程序。 我的意思是,当 Kafka Consumer 尝试池化消息时,我们可以看到以下日志:

[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Bootstrap broker localhost:9094 (id: -1 rack: null) disconnected
[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Connection to node -1 (localhost/127.0.0.1:9094) could not be established. Broker may not be available.

主题或代理不可用时的标准行为。结果 - 应用程序不会停止。但我需要。

我正在尝试添加以下属性,但它不起作用:

spring.kafka.consumer.fetch-max-wait=1000
spring.kafka.admin.fail-fast=true
spring.kafka.session.timeout.ms=1000

一般来说,我希望得到如下行为:如果消费者无法连接 - 关闭应用程序

  • Spring Boot 版本:2.3.8.RELEASE
  • Kafka:spring-kafka-starter

Kafka 轮询示例:

consumer.poll(Duration.ofMinutes(5));

【问题讨论】:

  • 如果没有响应,您希望轮询在 1 秒后失败?如果是这种情况,则此调用 consumer.poll(Duration.ofMinutes(5)); 将永远不会发生,因为您在此处设置了 5 分钟的超时。你可以简单地用consumer.poll(1000); 运行它,看看它是否有效。 poll方法调用时需要设置超时时间,所以不需要额外配置超时时间。
  • 我研究了如何做到这一点的方法,你是对的。最好的方法是超越持续时间。我现在将这个问题的答案写出来,感谢您的参与!

标签: java spring spring-boot apache-kafka fail-fast


【解决方案1】:

作为其中一种方法 - 我们可以从上面的评论中使用。或者我刚刚根据以下代码创建了验证并且它可以工作

    public void validate() {
        try {
            consumer.listTopics(Duration.ofSeconds(10));
        } catch (TimeoutException e) {
            logger.error("Topics doesn't exist OR unavailable broker");
            System.exit(1);
        }
    }
    

【讨论】:

    猜你喜欢
    • 2019-03-14
    • 2014-04-09
    • 1970-01-01
    • 1970-01-01
    • 2018-09-29
    • 2018-01-27
    • 2018-08-25
    • 2017-08-07
    • 1970-01-01
    相关资源
    最近更新 更多