【问题标题】:Wait for Multiple Spring WebClient Mono Responses等待多个 Spring WebClient Mono 响应
【发布时间】:2020-02-14 21:01:09
【问题描述】:

我正在尝试在微服务应用程序中调用外部服务以并行获取所有响应并在开始其他计算之前将它们组合起来。我知道我可以在每个 Mono 对象上使用 block() 调用,但这会破坏使用反应式 api 的目的。是否可以并行启动所有请求并将它们合并到一个点。

示例代码如下。在这种情况下,在实际响应出现之前打印“完成”。我也知道订阅调用是非阻塞的。

我希望在收集所有响应后打印“完成”,因此需要某种阻止。但是不想阻止每个请求

final List<Mono<String>> responseOne = new ArrayList<>();
    IntStream.range(0, 10).forEach(i -> {
        Mono<String> responseMono =
                WebClient.create("https://jsonplaceholder.typicode.com/posts")
                        .post()
                        .retrieve()
                        .bodyToMono(String.class)
                ;
        System.out.println("create mono response lazy initialization");
        responseOne.add(responseMono);
    });

    Flux.merge(responseOne).collectList().subscribe( res -> {

        System.out.println(res);
    });
    System.out.println("Done");

根据建议,我想出了这个似乎对我有用的方法。

StopWatch watch = new StopWatch();
    watch.start();
    final List<Mono<String>> responseOne = new ArrayList<>();
    IntStream.range(0, 10).forEach(i -> {
        Mono<String> responseMono =
                WebClient.create("https://jsonplaceholder.typicode.com/posts")
                        .post()
                        .retrieve()
                        .bodyToMono(String.class);
        System.out.println("create mono response lazy initialization");
        responseOne.add(responseMono);
    });
    CompletableFuture<List<String>> futureCount = new CompletableFuture<>();
    List<String>  res = new ArrayList<>();
    Mono.zip(responseOne, Arrays::asList)
            .flatMapIterable(objects -> objects) // make flux of objects
            .doOnComplete(() -> {
                futureCount.complete(res);
            }) // will be printed on completion of the flux created above
            .subscribe(responseString -> {
                        res.add((String) responseString);
                    }
            );

    watch.stop();
    List<String> response = futureCount.get();
    System.out.println(response);
    // do rest of the computation
    System.out.println(watch.getLastTaskTimeMillis());

【问题讨论】:

    标签: spring-webflux project-reactor


    【解决方案1】:
    1. 如果您希望您的调用是并行的,最好使用Mono.zip
    2. 现在,您希望在收集所有响应后打印Done

    所以,你可以修改你的代码如下

    final List<Mono<String>> responseMonos = IntStream.range(0, 10).mapToObj(
            index -> WebClient.create("https://jsonplaceholder.typicode.com/posts").post().retrieve()
                .bodyToMono(String.class)).collect(Collectors.toList()); // create iterable of mono of network calls
    
    Mono.zip(responseMonos, Arrays::asList) // make parallel network calls and collect it to a list
            .flatMapIterable(objects -> objects) // make flux of objects
            .doOnComplete(() -> System.out.println("Done")) // will be printed on completion of the flux created above
            .subscribe(responseString -> System.out.println("responseString = " + responseString)); // subscribe and start emitting values from flux
    

    在响应式代码中显式调用 subscribeblock 也不是一个好主意。

    【讨论】:

      【解决方案2】:

      是否可以并行启动所有请求并将它们合并到一个点。

      这正是您的代码已经在做的事情。如果您不相信我,请在您的bodyToMono() 电话之后坚持.delayElement(Duration.ofSeconds(2))。您会看到您的列表在 2 秒多后打印出来,而不是 20 秒(如果按顺序执行 10 次会是这样。)

      组合部分发生在您的Flux.merge().collectList() 调用中。

      在这种情况下,在实际响应出现之前打印“完成”。

      这是意料之中的,因为您的最后一次System.out.println() 调用是在响应式回调链之外执行的。如果您希望在打印列表后打印“完成”(您在传递给您的 subscribe() 调用的消费者中混淆了变量名称 s),那么您需要将其放入该消费者中,而不是在外面。

      如果您正在与命令式 API 交互,因此需要在 list 上进行阻止,您可以这样做:

      List&lt;String&gt; list = Flux.merge(responseOne).collectList().block();

      ...它仍然会并行执行调用(因此仍然会为您带来一些优势),但随后会阻塞,直到所有调用都完成并组合成一个列表。 (但是,如果您只是将反应堆用于这种类型的用途,那么它是否值得值得商榷。)

      【讨论】:

      • 我希望在收集完所有响应后打印“完成”,因此需要某种阻止。但是不想阻止每个请求。
      • @Mkp 你不需要阻止来做到这一点 - 正如我所说,如果你只是把你的 System.out.println("Done"); 呼叫直接放在 System.out.println(res); 之后,它会在所有内容被收集后打印而不会阻塞.听起来您可能正在与命令式 API 进行交互,因此如果这是您的用例,我会在答案的末尾添加一点。
      • Mono.zip.flatMapIterableFlux.merge 有什么区别?
      猜你喜欢
      • 2019-11-04
      • 1970-01-01
      • 2019-10-21
      • 1970-01-01
      • 1970-01-01
      • 2019-10-01
      • 1970-01-01
      • 1970-01-01
      • 2021-01-08
      相关资源
      最近更新 更多