【问题标题】:2 ways streaming using spring webflux使用 spring webflux 流式传输的 2 种方式
【发布时间】:2021-11-12 15:19:40
【问题描述】:

我想知道是否可以使用 Spring Webflux 实现两种流式传输方式? 基本上,我希望让客户端发送服务器接收到的数据流,将它们映射到字符串,然后返回结果,所有这些都流利地进行,而无需收集数据。 我使用 RSocket 做到了,但我想知道是否可以使用 http 2.0(使用 Spring 和 Project-Reactor)获得相同的结果。

尝试这样做:

1- 客户:

  public Mono<Void> stream() {
    var input = Flux.range(1, 10).delayElements(Duration.ofMillis(500));
    return stockWebClient.post()
            .uri("/stream")
            .body(BodyInserters.fromPublisher(input, Integer.class))
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(String.class)
            .log()
            .then();
  }

2- 服务器:

@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<String> stream(@RequestBody Integer i) {
    return Flux.range(i, i+10).map(n -> String.valueOf(i)).log();
  }

或者:

public Flux<String> stream(@RequestBody Flux<Integer> i) {
    return i.map(n -> String.valueOf(i)).log();
  }

或者:

@PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<String> stream(@RequestBody List<Integer> i) {
    return Flux.fromIterable(i).map(n -> String.valueOf(i)).log();
  }

没有一个工作正常。

【问题讨论】:

  • 你可以使用 websockets 做到这一点

标签: spring spring-webflux project-reactor http2


【解决方案1】:

如果你想使用服务器发送事件,你需要返回一个Flux&lt;ServerSentEvent&lt;String&gt;&gt;

所以你的服务器方法应该是:

    @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> stream(@RequestBody Integer i) {
        return Flux.range(i, i + 10).map(n -> ServerSentEvent.builder(String.valueOf(n)).build());
    }

但在这种情况下,主体只是一个整数,您的客户端代码变为:

input.flatMap(i ->
        stockWebClient
                .post()
                .uri("/stream")
                .bodyValue(i)
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {})
                .mapNotNull(ServerSentEvent::data)
                .log())
                .blockLast();

您也可以对功能端点执行相同的操作。

如果您希望能够将数据从客户端流式传输到服务器并返回,您将无法使用 SSE,但您可以使用 websocket 来实现。

你需要一个 HandlerMapping 和一个 WebSocketHandler

public class TestWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Flux<WebSocketMessage> output = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(Integer::parseInt)
                .concatMap(i -> Flux.range(i, i + 10).map(String::valueOf))
                .map(session::textMessage);
        return session.send(output);
    }
}

处理程序的配置:

@Bean
    public TestWebSocketHandler myHandler() {
        return new TestWebSocketHandler();
    }

    @Bean
    public HandlerMapping handlerMapping(final TestWebSocketHandler myHandler) {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/streamSocket", myHandler);
        int order = -1; // before annotated controllers
        return new SimpleUrlHandlerMapping(map, order);
    }

在客户端:

var input2 = Flux.range(1, 10).delayElements(Duration.ofMillis(500));
        WebSocketClient client = new ReactorNettyWebSocketClient();
        client.execute(URI.create("http://localhost:8080/streamSocket"), session ->
                session.send(input2.map(i -> session.textMessage("" + i))).then(session.receive().map(WebSocketMessage::getPayloadAsText).log().then())
        ).block();

【讨论】:

    猜你喜欢
    • 2019-12-03
    • 2018-01-13
    • 2013-01-14
    • 2018-06-24
    • 2022-01-21
    • 2023-01-31
    • 2018-11-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多