【发布时间】:2020-02-14 21:01:09
【问题描述】:
我正在尝试在微服务应用程序中调用外部服务以并行获取所有响应并在开始其他计算之前将它们组合起来。我知道我可以在每个 Mono 对象上使用 block() 调用,但这会破坏使用反应式 api 的目的。是否可以并行启动所有请求并将它们合并到一个点。
示例代码如下。在这种情况下,在实际响应出现之前打印“完成”。我也知道订阅调用是非阻塞的。
我希望在收集所有响应后打印“完成”,因此需要某种阻止。但是不想阻止每个请求
final List<Mono<String>> responseOne = new ArrayList<>();
IntStream.range(0, 10).forEach(i -> {
Mono<String> responseMono =
WebClient.create("https://jsonplaceholder.typicode.com/posts")
.post()
.retrieve()
.bodyToMono(String.class)
;
System.out.println("create mono response lazy initialization");
responseOne.add(responseMono);
});
Flux.merge(responseOne).collectList().subscribe( res -> {
System.out.println(res);
});
System.out.println("Done");
根据建议,我想出了这个似乎对我有用的方法。
StopWatch watch = new StopWatch();
watch.start();
final List<Mono<String>> responseOne = new ArrayList<>();
IntStream.range(0, 10).forEach(i -> {
Mono<String> responseMono =
WebClient.create("https://jsonplaceholder.typicode.com/posts")
.post()
.retrieve()
.bodyToMono(String.class);
System.out.println("create mono response lazy initialization");
responseOne.add(responseMono);
});
CompletableFuture<List<String>> futureCount = new CompletableFuture<>();
List<String> res = new ArrayList<>();
Mono.zip(responseOne, Arrays::asList)
.flatMapIterable(objects -> objects) // make flux of objects
.doOnComplete(() -> {
futureCount.complete(res);
}) // will be printed on completion of the flux created above
.subscribe(responseString -> {
res.add((String) responseString);
}
);
watch.stop();
List<String> response = futureCount.get();
System.out.println(response);
// do rest of the computation
System.out.println(watch.getLastTaskTimeMillis());
【问题讨论】:
标签: spring-webflux project-reactor