【问题标题】:How to use Spring Reactive WebSocket and transform it into the Flux stream?如何使用 Spring Reactive WebSocket 并将其转换为 Flux 流?
【发布时间】:2017-11-01 23:26:32
【问题描述】:

Spring documentation 上有一些 WebSocketClient 示例:

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {...}).blockMillis(5000);

我不确定如何处理传入数据流? 在该块内{...}

我的意思是:如何过滤传入的数据并将其转换为 Flux?

这就是我想要的。

@GetMapping("/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<MyRecourse> getStreaming() {

    //  get some data from WebSocket (CoinCap service).
    //  Transform that data into MyRecourse object
    //  Return stream to a client 

}

【问题讨论】:

    标签: spring-websocket spring-webflux


    【解决方案1】:

    看看 WebSocketHandler.handle() lambda 的 WebSocketSession 参数:

    /**
     * Get the flux of incoming messages.
     */
    Flux<WebSocketMessage> receive();
    

    更多信息请参见Spring WebFlux Workshop

    更新

    让我们试试这个!

        Mono<Void> sessionMono =
                client.execute(new URI("ws://localhost:8080/echo"),
                        session ->
                                Mono.empty()
                                        .subscriberContext(Context.of(WebSocketSession.class, session))
                                        .then());
    
        return sessionMono
                .thenMany(
                        Mono.subscriberContext()
                                .flatMapMany(c -> c
                                        .get(WebSocketSession.class)
                                        .receive()))
                .map(WebSocketMessage::getPayloadAsText);
    

    更新 2

    或其他选项,但订阅被阻止:

        EmitterProcessor<String> output = EmitterProcessor.create();
    
        client.execute(new URI("ws://localhost:8080/echo"),
                session ->
                        session.receive()
                                .map(WebSocketMessage::getPayloadAsText)
                                .subscribeWith(output)
                                .then())
                .block(Duration.ofMillis(5000));
    
        return output;
    

    更新 3

    正在工作的 Spring Boot 应用程序:https://github.com/artembilan/webflux-websocket-demo

    主要代码如下:

        EmitterProcessor<String> output = EmitterProcessor.create();
    
        Mono<Void> sessionMono =
                client.execute(new URI("ws://localhost:8080/echo"),
                        session -> session.receive()
                                .map(WebSocketMessage::getPayloadAsText)
                                .subscribeWith(output)
                                .then());
    
        return output.doOnSubscribe(s -> sessionMono.subscribe());
    

    【讨论】:

    • 我不明白如何将 Flux 转换为 Flux 并返回结果流。
    • 嗯。我想知道它与原始请求有何关系……请在此处具体说明您的问题。无论如何,只有Flux.map() 可以将WebSocketMessage 转换为您的MyObj
    • Artem 是对的,不同的问题。 Spring WebFlux 目前仅支持原始 websocket(尚不支持 STOMP 和更高级别的消息传递抽象)。使用原始 websocket,您必须为此处理自己的“协议”。
    • 是的,但是如何从 Flux 获取 Fluxclient.execute(uri, session -&gt; session.receive().map(webSocketMessage -&gt; { return Flux.just(webSocketMessage.getPayloadAsText()); })); 错误:(67, 61) java: 不兼容的类型: lambda 表达式中的错误返回类型不存在类型变量 V,T 的实例,因此 reactor.core.publisher.Flux 符合reactor.core.publisher.Mono
    • session.receive().map(WebSocketMessage::getPayloadAsText)
    猜你喜欢
    • 1970-01-01
    • 2020-01-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-25
    • 2023-03-19
    • 2021-02-06
    • 2017-05-14
    相关资源
    最近更新 更多