【问题标题】:How message broadcast to subscribers in ConnectableFluxConnectableFlux 中如何向订阅者广播消息
【发布时间】:2020-02-13 15:14:59
【问题描述】:

我是 Spring WebFlux 的新手,我在此链接中找到了使用 Spring WebFlux 和 Websocket 的聊天演示:

https://blog.monkey.codes/how-to-build-a-chat-app-using-webflux-websockets-react/

根据文章,它说:

应用程序的关键是将 WebSocketSessions 连接到一个 其他。这是通过连接传入的消息流来实现的 与全球发布者的每个会话。另一方面,每次会议 订阅由全局发布者生成的消息。

反应堆配置:

    @Bean
    public UnicastProcessor<Event> eventPublisher(){
        return UnicastProcessor.create();
    }

    @Bean
    public Flux<Event> events(UnicastProcessor<Event> eventPublisher) {
        return eventPublisher
                .replay(25)
                .autoConnect();
    }

    @Bean
    public HandlerMapping webSocketMapping(UnicastProcessor<Event> eventPublisher, Flux<Event> events) {
        Map<String, Object> map = new HashMap<>();
        map.put("/websocket/chat", new ChatSocketHandler(eventPublisher, events));
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
        simpleUrlHandlerMapping.setUrlMap(map);

        //Without the order things break :-/
        simpleUrlHandlerMapping.setOrder(10);
        return simpleUrlHandlerMapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

聊天套接字处理程序:

@Override
public Mono<Void> handle(WebSocketSession session) {
    WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(eventPublisher);
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toEvent)
            .subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
    return session.send(outputEvents.map(session::textMessage));
}

private static class WebSocketMessageSubscriber {
    private UnicastProcessor<Event> eventPublisher;
    private Optional<Event> lastReceivedEvent = Optional.empty();

    public WebSocketMessageSubscriber(UnicastProcessor<Event> eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    public void onNext(Event event) {
        lastReceivedEvent = Optional.of(event);
        eventPublisher.onNext(event);
    }

    public void onError(Throwable error) {
        //TODO log error
        error.printStackTrace();
    }

    public void onComplete() {
        lastReceivedEvent.ifPresent(event -> eventPublisher.onNext(
                Event.type(USER_LEFT)
                        .withPayload()
                        .user(event.getUser())
                        .build()));
    }

}

如果一个用户发送消息,那么其他用户可以看到此消息,因为它已广播给所有用户。但是当我阅读代码时,我不明白 ConnectableFlux(事件)如何将消息发送到所有 Web 套接字会话。我还尝试评论 UserStats 类的代码,但它仍然可以正常工作。谁能给我解释一下?

【问题讨论】:

    标签: spring spring-webflux


    【解决方案1】:

    通过 Websocket 从服务器向浏览器发送消息的代码是:

    session.send(outputEvents.map(session::textMessage))
    

    换句话说,只要一个事件(消息)被推送到 outputEvents 通量上,它就会通过链接到会话的 Websocket 发送到浏览器。

    它像广播一样起作用的原因是,为每个新的 WebSocketSession 创建的每个 WebSocketMessageSubscriber 都会将它接收到的所有内容(从浏览器到服务器)发布到 eventPublisher 上,从而使其在 outputEvents Flux 上可见。

      public void onNext(Event event) {
        lastReceivedEvent = Optional.of(event);
        eventPublisher.onNext(event);
      }
    

    outputEvents Flux 在所有 WebSocketMessageSubscribers 之间共享,并且通过此 Flux 接收到的所有事件都通过 Websocket 发送回浏览器,包括从该 Websocket 接收到的事件。

    下图中的“Global Message Stream”代表了outputEvents Flux。

    【讨论】: