【问题标题】:Handling Kafka timeout exception Java处理 Kafka 超时异常 Java
【发布时间】:2017-09-07 08:26:50
【问题描述】:

我在我的应用程序中使用 Kafka,我想在 kafka 发送消息期间处理超时异常。接下来我尝试做:在回调中抛出运行时异常并在方法调用中捕获它。

这是 kafka 调用:

public final class ManualKafkaBetsProducer implements IKafkaBetsProducer
{
    @Autowired
    private KafkaTemplate<ReportsBetKey, IReportsBet> template;

    public void send(IReportsBet bet)
    {
        template.send(MessageBuilder.withPayload(bet).setHeader(KafkaHeaders.TOPIC, kafkaBetsTopicName)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new ReportsBetKey(bet)).build())
                .addCallback(res -> logger.debug("Message {} send OK"), ex -> {
                    throw new RuntimeException(ex);
                });
    }
}

这是方法调用:

try
{
    manualKafkaBetsProducer.send(bet);
}
catch (Exception ex)
{
     return TransactionResultType.GENERAL_ERROR.name();
}

问题是:当超时expeption发生时,它并没有进入catch块。当发生其他一些错误时,它可以正常工作。 任何想法如何处理这个? 谢谢。

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    您的send 方法返回ListenableFuture&lt;SendResult&gt;

    如果你想在发送失败的情况下在发送线程上获得一个异常,你可以等待这个future完成(通过调用get())并检查结果。

    如果您希望发送线程等待,callback 很有用。它让您有机会稍后让另一个线程获取结果。

    【讨论】:

    • 如果你不阻塞,代码会继续运行,你不会得到异常。你不必马上阻塞,你可以稍后检查Future,但是一旦你需要知道是否有异常,你必须等待结果。
    • 您还可以配置 Kafka 生产者最多阻塞多长时间 (max.block.ms)。然后您不必等待太久(但这也可能会影响生产者的工作 - 有时会出现您想要覆盖的网络故障)。
    猜你喜欢
    • 2019-07-13
    • 1970-01-01
    • 2017-01-07
    • 2019-07-10
    • 2017-01-24
    • 2011-02-17
    • 2015-02-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多