【问题标题】:springboot2 +webflux + websocketspringboot2 +webflux + websocket
【发布时间】:2019-06-12 09:15:21
【问题描述】:

我在 JDK 11 上使用带有 Webflux 的 Spring boot 2。我编写了以下配置类:

@Configuration
public class WebSocketConfiguration {

    @Autowired
    @Bean
    public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
        final Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", server);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

还有下面的WebSocketHandler方法:

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive().
            map(msg -> webSocketSession
                    .textMessage("response:jack ->" + msg.getPayloadAsText())));
}

现在,我可以接收我发送的任何内容,例如:

客户端发送:4545

客户端接收 :response:jack ->4545

我想知道当客户端不给我发消息的时候,我怎么给客户端推送消息,我需要随时推送消息!

我如何随时发送自定义消息而不是使用相同的输入消息进行响应?

【问题讨论】:

    标签: spring-boot websocket spring-webflux


    【解决方案1】:

    您可以在我的博客文章http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/ 中了解它。

    您需要将您的WebSocketHandler 更改为:

    private final GreetingsPublisher greetingsPublisher;
    private final Flux<String> publisher;
    
    public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
        this.publisher = Flux.create(greetingsPublisher).share();
    }
    
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        final Flux<WebSocketMessage> message = publisher
                .map(greetings -> webSocketSession.textMessage(greetings));
    
        return webSocketSession.send(message);
    }
    

    并添加GreetingPublisher

    @Component
    public class GreetingsPublisher implements Consumer<FluxSink<String>> {
        private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);
    
        private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        private final Executor executor = Executors.newSingleThreadExecutor();
    
        public boolean push(String greeting) {
            return queue.offer(greeting);
        }
    
        @Override
        public void accept(FluxSink<String> sink) {
            this.executor.execute(() -> {
                while (true) {
                    Try.of(() -> {
                        final String greeting = queue.take();
                        return sink.next(greeting);
                    })
                            .onFailure(ex -> log.error("Could not take greeting from queue", ex));
    
                }
            });
        }
    }
    

    它是一个 bean,所以无论你注入它并调用 push 方法,它都会使用 WebSocket 发送消息。例如:

    @Controller
    public class GreetingsController {
    
        private final GreetingsPublisher greetingsPublisher;
    
        public GreetingsController(GreetingsPublisher greetingsPublisher) {
            this.greetingsPublisher = greetingsPublisher;
        }
    
        @Bean
        RouterFunction<ServerResponse> pushMessage() {
            return route(GET("/push"),
                    request -> {
                        greetingsPublisher.push("Send a new message with WebSocket");
                        return ServerResponse.ok().body(fromObject("websocket message sent"));
                    });
        }
    }
    

    先连接WebSocket,在localhost:8080/push上打开浏览器。应该发送消息。

    请注意,这似乎是 Spring Boot 2.1.7 的一个错误,我在我的博文中提到过。

    【讨论】:

      猜你喜欢
      • 2020-02-09
      • 2018-09-06
      • 2020-03-15
      • 2022-08-06
      • 2020-03-12
      • 1970-01-01
      • 2018-09-06
      • 2019-04-10
      • 2020-11-05
      相关资源
      最近更新 更多