【问题标题】:Wait for kafkaTemplate pending future等待 kafkaTemplate 等待未来
【发布时间】:2020-08-11 13:33:45
【问题描述】:

我正在执行一个异步操作,它在循环中返回一个未来对象(比如 10 条消息)。 据我了解,回调方法会在 Future 完成其任务时自动触发并执行。

假设我的第 7 个未来处于未决阶段。我该如何异常完成这个未来?

以及处理这种情况的最佳方法是什么。

List<ListenableFuture<SendResult<String, String>>> cf = new ArrayList<ListenableFuture<SendResult<String, String>>>();

future = kafkaTemplate.send(topicName, message);
cf.add(future);

i++;

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

    @Override
    public void onSuccess(SendResult<String, String> result) {
        syso("sent success");
    }

    @Override
    public void onFailure(Throwable ex) {
        System.out.println(" sending failed");
    }
});

【问题讨论】:

    标签: java apache-kafka future spring-kafka kafka-producer-api


    【解决方案1】:

    您为什么要这样做?

    如果出现异常,kafka-clients会调用模板的回调异常,模板会异常完成future。

    如果您出于某种原因确实需要这样做(但我想了解原因),您可以将其转换为 SettableListenerFuture

    【讨论】:

    • 我的意思是假设我发送了 10 条消息,并且由于某种原因第 7 个未来处于待处理状态,那么我的应用程序会发生什么?它会无限期地等待第 7 个未来完成吗?这甚至是一个可能的情况?
    • 它不应该发生,但它可能会发生(我对kafka-clients 的了解不够)但是您可以遍历期货并使用future.get(10, TimUnit.SECONDS),如果未来是未在该时间内完成(成功或异常);我永远不会推荐在 any 未来没有超时的情况下使用get()。如果您根本不调用任何 get,则应用程序根本不会等待。
    • 我之所以要这样做是因为我想在消息处理完成后发送电子邮件
    • 所以get(...) 超时可能是您的最佳解决方案。
    猜你喜欢
    • 2021-03-26
    • 2015-01-07
    • 2020-06-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-09
    • 1970-01-01
    • 2019-10-28
    相关资源
    最近更新 更多