【问题标题】:Spring web socket messaging - subscribe and send initial messageSpring web socket 消息传递 - 订阅和发送初始消息
【发布时间】:2017-06-21 19:16:40
【问题描述】:

使用用于 Web 套接字消息传递的 Stomp 代理中继,我可以订阅目标 /topic/mydest。这将创建一个代理订阅,并接收系统中的某些东西为此代理目标触发的所有消息,这会在系统中的某些事件发生时发生。

我可以订阅一个目的地/app/mydest,一个带有@SubscribeMapping("mydest") 的控制器方法将被调用,返回值只在这个套接字上作为消息发送回来。据我所知,这是唯一会为此订阅发送的消息。

有没有办法将这些合并到一个订阅中,即为某个/topic 目的地创建一个代理订阅,触发一些代码,直接将消息发送回订阅者?

用例:当系统发生错误时,将带有当前错误计数的消息发送到/topic/mydest。当一个新客户订阅时,我只想向他发送最后一个已知的错误计数。其他人暂时不感兴趣,因为计数没有改变。

我当前的解决方案是同时订阅/app/mydest/topic/mydest 并在客户端上使用相同的消息处理程序。但它确实是一个合乎逻辑的订阅,而且有点容易出错,因为客户需要记住同时订阅两者。

在这种情况下我的问题是:/app/ 订阅是否还会有更多消息?有什么可以触发的吗?在不向现有订阅者发送冗余消息的情况下,我还能如何将初始信息发送给某个主题的订阅者?

根据要求,这是我的 Websocket 配置类。

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue/", "/topic/", "/exchange/");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

【问题讨论】:

  • 也许我应该澄清一下:从实际的角度来看,我描述的当前解决方案已经足够好了。我提出这个问题的主要动机是充分掌握订阅用户目的地的意图,以及对代理目的地的订阅采取行动的方式。

标签: spring stomp spring-websocket spring-messaging


【解决方案1】:

您可以使用ApplicationListenerSessionSubscribeEvent。 示例:

@Component
public class SubscribeListener implements ApplicationListener<SessionSubscribeEvent> {

    private final SimpMessagingTemplate messagingTemplate;

    @Autowired
    public SubscribeListener(SimpMessagingTemplate messagingTemplate) {
        this.messagingTemplate = messagingTemplate;
    }

    @Override
    public void onApplicationEvent(SessionSubscribeEvent event) {
        messagingTemplate.convertAndSendToUser(event.getUser().getName(), "/topic/mydest", "Last known error count");
    }
}

【讨论】:

  • 我会试试的。但这不需要额外订阅/user/topic/mydest吗?
  • 不,只是订阅/topic/mydest。用我的应用测试 - 工作正常
  • 它对我不起作用。使用SessionSubscribeEvent 采取行动的想法是一个有价值的提示,我很感激。但是向用户目标 /topic/mydest 发送消息不会在新订阅到常规代理 目标/topic/mydest 时收到。坦率地说,我在文档中的任何地方都没有看到任何暗示支持这一假设。当然,我可以在事件监听器中做messagingTemplate.convertAndSend("/topic/mydest") 并向每个订阅者发送消息。虽然这不会对那种信息造成真正的伤害,但这不是我想要做的。
  • @rainerfrey 您能否更新问题并提供您的 WebSocket 配置?
  • 如果这适用于任何人,请发送 github 链接。
【解决方案2】:

您可以监听会话订阅事件并发送初始消息

@Component
@RequiredArgsConstructor
public class WebSocketEventListener {

private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class);
private final SimpMessagingTemplate simpMessagingTemplate;


@EventListener
public void handleSessionSubscribeEvent(SessionSubscribeEvent event) {
    logger.info("Subscribed to session: " + event);
    Principal user = event.getUser();
    if (user instanceof UsernamePasswordAuthenticationToken) {
        UsernamePasswordAuthenticationToken token = (UsernamePasswordAuthenticationToken) user;
        if (token.getPrincipal() instanceof UserDetails) {
            UserDetails userDetails = (UserDetails) token.getPrincipal();
            simpMessagingTemplate.convertAndSendToUser(userDetails.getUsername(), "/queue/notify", "Hello");
        }
    }
}

}

【讨论】:

    【解决方案3】:

    我想我找到了解决您问题的方法:

    您必须订阅用户特定的主题。 在我的示例中,我创建了一个主题 /topic/progress

    我订阅/user/topic/progress

    stompClient.subscribe('/user/topic/progress', progressMessage => {
          ...
    })
    

    我创建了一个组件来侦听SessionSubscribeEvent,以便对新订阅做出反应:

    @Component
    public class WebSocketEventListener {
    
        @Autowired
        private WebSocketService webSocketService;
    
        @Autowired
        private ProgressService progressService;
    
        @EventListener
        public void handleWebSocketConnectListener(SessionSubscribeEvent event) throws IllegalAccessException {
            if(Objects.equals(event.getMessage().getHeaders().get("simpDestination"), "/user/topic/progress")) {
                webSocketService.sendCurrentProgessToUser(progressService.getProgress(), event.getUser().getName());
            }
        }
    }
    

    WebSocket 服务用于向订阅用户发送消息。 我有一种方法可以将内容广播到某个主题,并且可以仅将其发送给特定用户。广播方式不是原来的方式。我使用SimpUserRegistry 来获取所有订阅者并将消息分别发送给每个订阅者:

    @Controller
    @Service
    public class WebSocketService {
    
        @Autowired
        private SimpMessagingTemplate simpMessagingTemplate;
    
        @Autowired
        private SimpUserRegistry simpUserRegistry;
    
        private static final String WS_PROGRESS_DESTINATION = "/topic/progress";
    
        public void broadcastCurrentProgress(Progress progress) {
            ProgressDto progressDto = new ProgressDto(progress);
            List<String> subscribers = simpUserRegistry.getUsers().stream()
                    .map(SimpUser::getName).collect(Collectors.toList());
            for(String username : subscribers) {
                simpMessagingTemplate.convertAndSendToUser(username, WS_PROGRESS_DESTINATION, progressDto);
            }
        }
    
        public void sendCurrentProgessToUser(Progress progress, String name) {
            ProgressDto progressDto = new ProgressDto(progress);
            simpMessagingTemplate.convertAndSendToUser(name, WS_PROGRESS_DESTINATION, progressDto);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-04
      相关资源
      最近更新 更多