【发布时间】:2019-10-08 11:22:42
【问题描述】:
我创建了一个 Spring Boot 应用程序来将消息发布到 Kafka 队列。为此,我使用 spring cloud stream 和 Kafka binder 作为依赖项。问题是我的应用程序会不断尝试连接到 Kafka 代理,如果它由于默认配置而关闭 2 分钟。
我使用以下属性减少了时间并将其设置为 1000 毫秒并获得超时异常
spring.kafka.properties.request.timeout.ms:1000。
但是,我的 spring 应用程序仍然在异常之后运行。如果无法连接到 Kafka 代理,我希望它失败。我已经为 spring.kafka.admin.fail-fast=true 尝试了另一个属性,但应用程序仍在运行。
我还尝试搜索 spring cloud stream 和 Kafka binder 的一些属性,如果 Kafka 代理不可用但找不到与此相关的任何内容,我可以将其设置为使我的应用程序失败。
请帮帮我。
异常日志见下文。
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:351)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:325)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:302)
... 33 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
2019-05-22 06:06:25 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'outputBindingLifecycle'
2019-05-22 06:06:25 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147482647
2019-05-22 06:06:25 [main] DEBUG o.s.c.s.binding.BindableProxyFactory - Binding inputs for :interface kafka.stream.RXXXStreams
2019-05-22 06:06:25 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'inputBindingLifecycle'
2019-05-22 06:06:25 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483547
2019-05-22 06:06:25 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
2019-05-22 06:06:25 [main] DEBUG o.s.b.a.l.ConditionEvaluationReportLoggingListener -
【问题讨论】:
-
在应用启动时,如果kafka队列不可用,则根本不会启动。但是如果应用程序已经启动并且在一段时间后如果队列关闭,它不会杀死应用程序。即使您将应用程序捆绑为 jar,它内部也会使用嵌入式 tomcat,它作为 webapp 而不是作为独立程序运行
-
就我而言,Kafka 队列在应用程序启动之前就已关闭,这就是我测试上述场景的方式。
-
好的,在这种情况下,应用程序不应该启动。可以分享你用来连接kafka队列的代码吗?
-
我正在为此使用 spring 云流属性。 PSB 相同的
spring.cloud.stream.kafka.binder.brokers=${kafkaBrokers} spring.cloud.stream.bindings.output.destination=${kafkaTopicName}
标签: spring-boot spring-kafka spring-cloud-stream