【发布时间】:2021-12-13 15:24:23
【问题描述】:
我有一个客户列表,我想从外部 api 为其收集数据。
我想启动多个线程来收集数据并等待所有线程完成,如果每个线程在特定时间内没有完成,我想将其保存在数据库中。
我正在使用CompletableFuture.allOf
我的代码是这样的
public void fetchDataForAllClients() {
String previousDate = DateUtils.getPreviousDate();
List<Integer> clientIdList = PropertiesUtil.getClientIdList();
CompletableFuture.allOf(clientIdList.stream()
.map(clientId -> fetchData(previousDate, clientId)
.exceptionally(e -> {
LOGGER.error(e.getMessage(), e);
return null;
})
.thenAcceptAsync(s -> System.out.println(s + ". FetchDataThread Finished for "+ clientId + " at " + LocalDateTime.now())))
.toArray(CompletableFuture<?>[]::new))
.join();
}
@Async
CompletableFuture<Integer> fetchData(final String date, final Integer clientId) {
counter++;
System.out.println(counter + ". FetchDataThread Started for "+ clientId + " at " + LocalDateTime.now());
boolean failed = false;
String errorMsg = null;
try {
myApiService.fetchDataForClient(clientId, date, date);
} catch (MyApiException exception) {
failed = true;
errorMsg = exception.getMessage();
}
fetchStatsService.createFetchStats(clientId, date, failed, errorMsg);
return CompletableFuture.completedFuture(counter);
}
这个问题是它不会在异步中启动fetchData(previousDate, clientId)。它按顺序运行。
【问题讨论】:
-
问题是您的流正在调用
#fetchData,它执行大量同步代码,然后将已经检索到的结果包装在CompletableFuture中。对于几乎整个方法体,您需要类似于CompletableFuture#supplyAsync的东西,这将是生成Supplier<Integer>以供 CompletableFuture 运行。
标签: java multithreading completable-future