【问题标题】:Java Start multiple CompletableFuture asynchronously and wait for them to finishJava 异步启动多个 CompletableFuture 并等待它们完成
【发布时间】:2021-12-13 15:24:23
【问题描述】:

我有一个客户列表,我想从外部 api 为其收集数据。 我想启动多个线程来收集数据并等待所有线程完成,如果每个线程在特定时间内没有完成,我想将其保存在数据库中。 我正在使用CompletableFuture.allOf

我的代码是这样的

    public void fetchDataForAllClients() {
        String previousDate = DateUtils.getPreviousDate();
        List<Integer> clientIdList = PropertiesUtil.getClientIdList();

        CompletableFuture.allOf(clientIdList.stream()
                        .map(clientId -> fetchData(previousDate, clientId)
                                .exceptionally(e -> {
                                    LOGGER.error(e.getMessage(), e);
                                    return null;
                                })
                                .thenAcceptAsync(s -> System.out.println(s + ". FetchDataThread Finished for "+ clientId + " at " + LocalDateTime.now())))
                        .toArray(CompletableFuture<?>[]::new))
                .join();
    }

    @Async
    CompletableFuture<Integer> fetchData(final String date, final Integer clientId) {
        counter++;
        System.out.println(counter + ". FetchDataThread Started for "+ clientId + " at " + LocalDateTime.now());
        boolean failed = false;
        String errorMsg = null;
        try {
            myApiService.fetchDataForClient(clientId, date, date);
        } catch (MyApiException exception) {
            failed = true;
            errorMsg = exception.getMessage();
        }
        fetchStatsService.createFetchStats(clientId, date, failed, errorMsg);
        return CompletableFuture.completedFuture(counter);
    }

这个问题是它不会在异步中启动fetchData(previousDate, clientId)。它按顺序运行。

【问题讨论】:

  • 问题是您的流正在调用#fetchData,它执行大量同步代码,然后将已经检索到的结果包装在CompletableFuture 中。对于几乎整个方法体,您需要类似于 CompletableFuture#supplyAsync 的东西,这将是生成 Supplier&lt;Integer&gt; 以供 CompletableFuture 运行。

标签: java multithreading completable-future


【解决方案1】:

@Aync 如果从同一个类中调用它将不起作用,因为它将调用原始方法而不是被拦截的方法,因此将 fetchData 方法更改为返回 Integer 然后使用实际生成新线程的 compleatableFuture.supplyAsync() 调用该方法执行那个方法

   List<CompleatbleFutures> futures= clientIdList.stream()
    .map(id->CompleatbleFutures.supplyAsync(fetchdata(..)))
    .collect(Collectors.toList());
    CompleatbleFuture.allOf(futures.toArray(futures.size));

【讨论】:

    猜你喜欢
    • 2010-10-11
    • 1970-01-01
    • 2014-09-20
    • 1970-01-01
    • 1970-01-01
    • 2021-05-11
    • 1970-01-01
    • 2018-11-08
    • 1970-01-01
    相关资源
    最近更新 更多