【问题标题】:Spring webclient exchangeToFlux() not making HTTP requestSpring webclient exchangeToFlux()不发出HTTP请求
【发布时间】:2021-10-26 11:44:47
【问题描述】:

我正在使用 Spring WebFlux 5.3.6 的 WebClient 流式传输来自 REST 端点的响应,该端点生成 text/csv 内容。

我可以像这样使用retrieve()responseSpec.bodyToFlux 流式传输仅正文

WebClient.ResponseSpec responseSpec = headersSpec.retrieve();
        Flux<DataBuffer> dataBufferFlux = responseSpec.bodyToFlux(DataBuffer.class);
        DataBufferUtils
                .write(dataBufferFlux, outputStream)
                .blockLast(Duration.of(20, ChronoUnit.SECONDS));

我想获取内容类型标头并将其作为测试的一部分进行验证。上面的代码仅提供对响应正文的访问,而不是对标头的访问。

我尝试改用exchangeToFlux() 来获得更多控制权并访问响应标头,但我看到的是从未发出过HTTP 请求。如果我向myResponse.setStatus(clientResponse.rawStatusCode()); 添加断点,它永远不会被命中。

下面是更完整的代码示例。我一直在努力寻找任何使用 DataBuffer 将结果流回的 exchangeToFlux 示例。

HttpClient httpClient = HttpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            .responseTimeout(Duration.ofMillis(5000))
            .doOnConnected(conn ->
                    conn.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS))
                            .addHandlerLast(new WriteTimeoutHandler(5000, TimeUnit.MILLISECONDS)));

    WebClient webClient = WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .build();
    WebClient.RequestHeadersSpec<?> headersSpec = webClient
            .get()
            .uri("http://localhost:8080/v1/users")
            .header(CONTENT_TYPE, "text/csv");


    MyResponse<T> myResponse = new MyResponse<>();
    CountDownLatch latch = new CountDownLatch(1);
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

    headersSpec.exchangeToFlux(clientResponse -> {
        // Never enters here!
        myResponse.setStatus(clientResponse.rawStatusCode());
        myResponse.setContentType(clientResponse.headers().contentType());
        latch.countDown();

        if (clientResponse.statusCode() == HttpStatus.OK) {
            Flux<DataBuffer> dataBufferFlux = clientResponse.bodyToFlux(DataBuffer.class);
            DataBufferUtils
                    .write(dataBufferFlux, outputStream)
                    .blockLast(Duration.of(20, ChronoUnit.SECONDS));

            return dataBufferFlux;
        }

        return Flux.empty();
    });

    latch.await();
    return myResponse;

【问题讨论】:

    标签: spring webclient reactive flux


    【解决方案1】:

    好像您没有订阅从headersSpec.exchangeToFlux 返回的 Flux。

     headersSpec.exchangeToFlux(clientResponse -> {
            // Never enters here!
            myResponse.setStatus(clientResponse.rawStatusCode());
            myResponse.setContentType(clientResponse.headers().contentType());
            latch.countDown();
    
            if (clientResponse.statusCode() == HttpStatus.OK) {
                Flux<DataBuffer> dataBufferFlux = clientResponse.bodyToFlux(DataBuffer.class);
                DataBufferUtils
                        .write(dataBufferFlux, outputStream)
                        .blockLast(Duration.of(20, ChronoUnit.SECONDS));
    
                return dataBufferFlux;
            }
    
            return Flux.empty();
        })
        .subscribe(); // <- Subscribe is missing.
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-09-19
      • 2022-01-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-30
      • 2023-03-08
      • 1970-01-01
      相关资源
      最近更新 更多