【问题标题】:Flux collectList() on list of WebClient exchanges always emptyWebClient 交换列表上的 Flux collectList() 始终为空
【发布时间】:2020-04-27 20:15:46
【问题描述】:

我正在尝试使用WebClient 执行列表请求,然后过滤它们以找到第一个成功的请求(如果有)并返回。或者如果不成功则回退到默认响应。

我面临的问题是,当我在 Flux<ServerResponse> 上调用 .collectList() 时,列表始终为空。根据我之前发出的请求数,我预计该列表将包含 NServerResponse

public Mono<ServerResponse> retry(ServerRequest request) {
    return Flux.fromIterable(request.headers().header(SEQUENCE_HEADER_NAME))
            .map(URI::create)
            // Build a "list" of responses
            .flatMap(uri -> webClientBuilder.baseUrl(uri.toString()).build()
                    .method(Objects.requireNonNull(request.method()))
                    .headers(headers -> request.headers().asHttpHeaders().forEach((key, values) -> {
                        if (!SEQUENCE_HEADER_NAME.equals(key)) {
                            headers.addAll(key, values);
                        }
                    }))
                    .body(BodyInserters.fromDataBuffers(request.body(BodyExtractors.toDataBuffers())))
                    .exchange()
                    .flatMap(clientResponse -> ServerResponse.status(clientResponse.statusCode())
                            .headers(headers -> headers.addAll(clientResponse.headers().asHttpHeaders()))
                            .body(BodyInserters.fromDataBuffers(clientResponse.body(BodyExtractors.toDataBuffers()))))
            )
            // "Wait" for all of them to complete so we can filter
            .collectList()
            .flatMap(clientResponses -> {
                List<ServerResponse> filteredResponses = clientResponses.stream()
                        .filter(response -> response.statusCode().is2xxSuccessful())
                        .collect(Collectors.toList());

                if (filteredResponses.isEmpty()) {
                    log.error("No request succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
                    return ServerResponse.badRequest().build();
                }

                if (filteredResponses.size() > 1) {
                    log.error("Multiple requests succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
                    return ServerResponse.badRequest().build();
                }

                return Mono.just(filteredResponses.get(0));
            });
}

知道为什么.collectList() 总是返回一个空列表吗?

【问题讨论】:

  • Flux.fromIterable(request.headers().header(SEQUENCE_HEADER_NAME)) -- 不知道您的问题,但您不应该以这种方式开始您的流程。此外,您不应该使用Flux 来触发一系列WebClient Monos
  • 您能否详细说明为什么不以这种方式开始流程?我猜我需要先单独构造WebClientMonos,然后将它们与.onSuccess()/onError() 链接在一起,这就是你的意思吗?

标签: spring-webflux project-reactor reactor


【解决方案1】:

好吧,在我看来,您有一个令人困惑的要求,因为您想要响应的 First Mono 但您试图将该功能放入 Flux 中,这意味着处理流程中的所有项目有效率的。 Webflux 中的Mono 旨在创建一个流,该流将有效地对流中的项目执行一系列转换。您要求为第一个成功的 URI 测试一堆 URI 并没有什么是 WebFlux 的好处,所以我不得不质疑为什么要尝试将其强制到框架中。

您可能会争辩说Flux 为您提供了更好的异步处理,但我认为当它是一堆WebClient 调用时情况并非如此。 WebClient 在底层仍然是 HTTP,因此流程中的每个项目都会在 WebClient 周围停止和开始。如果你想异步执行 HTTP,你应该使用 ThreadPoolCallable

【讨论】:

  • 所以,如果我有一个 Mono 列表,并且我想在获取所有响应时做一些事情,我使用 Flux.fromIterable()....collectlist().subscribe(x->myList .add(x)) 它失败了,为什么?
  • 可能是因为您有一个不正确的 Monos Flux。使用 Mono.zip 来一次处理多个 Mono。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-12
  • 2019-04-19
  • 2021-04-13
  • 1970-01-01
  • 2020-05-03
  • 1970-01-01
相关资源
最近更新 更多