【问题标题】:Kafka producer future get timeoutKafka生产者未来超时
【发布时间】:2021-06-04 23:38:26
【问题描述】:

我正在尝试处理失败,以防生产者无法向 Kafka 发送消息:

try {
    Future<RecordMetadata> res = producer.send(new ProducerRecord<>(topic, msg.key(), msg));
    log.info("Waiting for confirmation from kafka for message : \n {}",msg.toString());
    record = res.get(30,TimeUnit.SECONDS);
    log.info("Successfully produced message : msg")
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    log.error("The following message wasnt sent to kafka because of an error : {}", msg.toString(), e);
}

当我尝试向不存在的主题生成消息时,我看到以下错误:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic just_fake_topic  not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1307) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:962) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:750) ~[kafka-clients-2.5.0.jar!/:na]
at com.xx.xx.produceMessage.handleMessage(produceMessage.java:27) ~[classes!/:0.0.1-SNAPSHOT]
at 
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Caused by: org.apache.kafka.common.errors.TimeoutException: Topic just_fake_topic not present in metadata after 60000 ms.

我在 60 秒后得到超时,而不是我在未来对象的 get 方法中配置的 30 秒。

我还尝试在我的 kafka.properties 中配置 metadata.max.idle.ms=30000max.block.ms=30000,因为我也收到了大约 5 分钟的以下警告,但没有帮助:

2020-06-28 14:31:26.009  WARN 10291 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 695 : {just_fake_topic =UNKNOWN_TOPIC_OR_PARTITION}

知道为什么吗?

【问题讨论】:

  • 也许可以试试producer.flush(),或者specify Kafka configs
  • 就我而言,我只是启动了一个生产者实例,然后尝试发送一条消息,其中的主题不存在。仅此而已..
  • 设置max.block.ms 应该可以工作。即使您设置了较小的max.block.ms 值,生产者直到 60 秒才返回? @JeyJ
  • 确实如此。我将它设置为 10000,但它仍然在 60 秒后返回。此外,future.get(10,TimeUnit.SECONDS) 实际上并没有花费 10 秒

标签: java apache-kafka


【解决方案1】:

异常可能是从 Kafka 网络线程抛出的。

https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-producer-internals-Sender.html

也许提供一个回调作为发送的第二个参数。 注意:回调将在kafka网络线程上运行。

Kafka Producer : Handle Exception in Async Send with Callback

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-07-21
    • 1970-01-01
    • 2023-03-03
    • 1970-01-01
    • 1970-01-01
    • 2016-04-11
    • 1970-01-01
    • 2016-12-12
    相关资源
    最近更新 更多