【发布时间】:2020-05-25 19:24:37
【问题描述】:
我有一个运行状况线程,它每 5 秒从我的工作应用程序检查一次 Kafka 集群的状态。但是,我时不时会收到TimeoutException:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
我也有工具可以从外部监视我的集群(Cruise Control、Grafana),但它们都没有指出集群中的任何问题。此外,我的工作应用程序不断消耗消息,似乎没有一个失败。
为什么我偶尔会收到此超时?如果经纪人没有关闭,那么我认为我的配置中的某些内容已关闭。我将超时设置为 5 秒,这似乎绰绰有余。
我的 AdminClient 配置:
@Bean
public AdminClient adminClient() {
return KafkaAdminClient.create(adminClientConfigs());
}
public Map<String, Object> adminClientConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
return props;
}
我如何检查集群(我比在代理列表上运行逻辑):
@Autowired
private AdminClient adminClient;
private void addCluster() throws ExecutionException, InterruptedException {
adminClient.describeCluster().nodes().get().forEach(node -> brokers.add(node.host()));
}
【问题讨论】:
标签: java apache-kafka spring-kafka