alimayun

基于websocket实现消息中心

最近部门有个需求,需要实现一个消息中心,简而言之,就是给各个系统提供与客户交互的桥梁,自然而然需要选择websocket协议,由于我们是使用的spring cloud这一套,因此以springboot为例来进行说明。

一、方案

A、整体方案

先说一下简单的场景,各系统通过Rabbitmq将要发送给客户端的消息推送到消息中心,消息中心再基于ws连接,将消息推送给客户端,实现交互。但是问题来了,生产上有多个节点(至少两台服务器吧),但是客户端只跟其中一台服务器建立ws连接,所以这个session如何维护呢?比如客户端A与服务器1建立连接,此时要推送的消息到了服务器2上,他没有与客户端A的连接,这个消息就无法推送,因此设计方案如下:

 

 

 

 a、客户端与消息中心建立ws连接,各节点维护各自的连接,例如使用ConcurrentHashMap

b、各应用将要推送给客户端的消息发送到rabbitmq,rabbitmq通过广播的方式将消息发送到消息中心的各节点

c、消息中心通过userId判断连接是否在本机维护,如果不在,直接忽略,如果session在本机维护,则推送消息到客户端

 

 

下面有个重要的问题就是rabbitmq的广播机制如何实现,这时候一百度,都是说不同服务订阅不同的队列就实现广播了。我想问一句,我是一个应用,生产上部署多个节点,代码配置都是同一套,你家上生产就部署一个节点啊???

B、Rabbimq广播机制实现(同应用不同节点)

Rabbitmq的topic模式跟其他mq都不大一样,他是指定exchange到queue的模式,如下图:

 

 

 所以我选择在系统启动时,基于雪花算法生成随机id,作为队列名,并且队列非持久化,项目一重启,之前的队列就消失。不过这里有个问题,就是应用关闭时,若此时mq中有消息未消费,就全丢失了,不过我们的场景可接受这种情况,因此选用这种方式。

package com.yunzhangfang.platform.message.gateway.service.mq.consumer;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageContentDTO;
import com.yunzhangfang.platform.message.gateway.client.dto.message.MessageDTO;
import com.yunzhangfang.platform.message.gateway.client.dto.user.UserDTO;
import com.yunzhangfang.platform.message.gateway.service.dto.MessageSaveDTO;
import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.MqConstant;
import com.yunzhangfang.platform.message.gateway.service.infrastructure.dataobject.MessageUser;
import com.yunzhangfang.platform.message.gateway.service.service.MessageService;
import com.yunzhangfang.platform.message.gateway.service.session.SessionManager;
import com.yzf.accounting.common.exception.BizRuntimeException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * 功能:接收来自于各应用的消息,并且将消息推送到目标用户
 * @author lenovo
 */
@Slf4j
@Component
public class MessageCreatedConsumer {

    @Autowired
    private ConnectionFactory connectionFactory;

    /**
     * 接收各应用发送的消息,并将消息保存进入mongodb
     * @throws IOException
     */
    @PostConstruct
    public void handleMessage() throws IOException {
        // id是使用雪花算法随机生成的
        String queue = "FLOW_QUEUE_" + id;
        String exchange = "FANOUT_FLOW_EXCHANGE";
        Connection conn = connectionFactory.createConnection();
        Channel channel = conn.createChannel(false);
        // 1、创建一个队列,id是使用雪花算法随机生成的,并且非持久化的,自动删除的
        channel.queueDeclare(queue, false, false, true, null);
        // 2、创建交换器
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
        // 3、将队列和交换器通过路由键进行绑定。fanout模式路由键直接设置为""。
        channel.queueBind(queue, exchange, "");
        channel.basicConsume(queue, new DefaultConsumer(channel) {
            // 4、当消息到达时执行回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg;
                try {
                    msg = new String(body, "UTF-8");
                    log.info("消息中心接收到消息,内容为:{}", msg);
                } catch (Exception e) {
                    log.error("消息中心接收消息出现异常", e);
                }
            }
        });
    }
}

这地方有个坑,因为队列是使用雪花算法生成的id拼装的,所以没办法使用@RabbitListener这个注解,只能通过channel的方式去实现消息消费,只是写起来麻烦一点,本质是一样的。

 

二、springboot集成websocket

springboot集成websokcet主要有两种方式,分别如下:

A、直接基于websocket协议实现

1、配置类

package com.chitic.supplywater.common.config.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 设置webSocket终端服务
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }

}

2、处理类

package com.chitic.supplywater.common.config.webSocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint("/webSocket/{sid}")
@Component
public class WebSocketServer {

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session,@PathParam("sid") String sid) {
       // 维护会话等
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        // 销毁会话等
    }

    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        // 收到客户端发送的消息
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        // 发生错误时
    }
    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 群发自定义消息
     * */
    public static void sendInfo(String message,@PathParam("sid") String sid) throws IOException {
        // 从会话管理器中获取会话,进行群发
    }
}

B、基于stomp协议

1、配置类

package com.yunzhangfang.platform.message.gateway.service.infrastructure.config;

import com.yunzhangfang.platform.message.gateway.service.infrastructure.constant.WebSocketConstant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;

/**
 * 开启使用STOMP协议来传输基于代理(MessageBroker)的消息,这时候控制器(controller)开始支持@MessageMapping,就像是使用@requestMapping一样。
 * @author lenovo
 */
@EnableWebSocketMessageBroker
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private WebSocketDecoratorFactory webSocketDecoratorFactory;

    @Autowired
    private WebSockHandshakeHandler webSockHandshakeHandler;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        // 创建一个serverPoint与前端交互
        stompEndpointRegistry.addEndpoint(WebSocketConstant.WEBSOCKET_SERVER_PATH)
                             // 防止跨域问题
                             .setAllowedOrigins("*")
                             // 握手时handler
                             .setHandshakeHandler(webSockHandshakeHandler)
                             // 指定使用SockJS协议
                             .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //全局使用的消息前缀(客户端订阅路径上会体现出来)
        registry.setApplicationDestinationPrefixes("/app");
        //用户订阅主题的前缀,/topic 代表发布广播,即群发 ,/queue 代表点对点,即发指定用户
        registry.enableSimpleBroker("/topic", "/queue");
        //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(webSocketDecoratorFactory);
    }
}
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpServletRequest;
import java.security.Principal;
import java.util.Map;

@Configuration @Slf4j
public class WebSockHandshakeHandler extends DefaultHandshakeHandler { /** * 此类在客户端与服务端握手的时候触发。
* tips:将请求中的参数userId塞到Principal中,可以理解成塞到websocket的session中,后续可通过Principal principal = session.getPrincipal()获取到
*/ @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest) request; HttpServletRequest httpRequest = servletServerHttpRequest.getServletRequest(); // 握手取userid final String userId = httpRequest.getParameter("userId"); log.info("客户端入参userId:{}", userId); if (StringUtils.isEmpty(userId)) { return null; } return () -> userId; } return null; } }
package com.yunzhangfang.platform.message.gateway.service.infrastructure.config;

import com.yunzhangfang.platform.message.gateway.service.session.SessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;

import java.security.Principal;

/**
 * 服务端和客户端在进行握手挥手时会被执行。进行session的维护
 */
@Component
@Slf4j
public class WebSocketDecoratorFactory implements WebSocketHandlerDecoratorFactory {

    @Autowired
    private SessionManager sessionManager;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
                log.info("有客户端连接,sessionId:{}", session.getId());
                // principal是自定义塞入到session中的数据(例如我们塞的是userId,后续通过userId可以找到其session)
                Principal principal = session.getPrincipal();
                if (principal != null && StringUtils.isNotBlank(principal.getName())) {
                    // principal.getName获取的就是WebSockHandshakeHandler中塞入的userId
                    if(!sessionManager.isConnected(principal.getName())) {
                        // 身份校验成功,缓存socket连接
                        sessionManager.add(principal.getName(), session);
                        log.info("客户端userId:{}存入redis", principal.getName());
                    }
                }

                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                log.info("客户端退出连接,sessionId:{}", session.getId());
                Principal principal = session.getPrincipal();
                if (principal != null) {
                    // 身份校验成功,移除socket连接
                    sessionManager.remove(principal.getName());
                    log.info("客户端userId:{}从redis中删除", principal.getName());
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}

通过使用WebSocketDecoratorFactory可在连接建立完成或连接关闭时触发,将userId和session存入本地Map。

2、提供客户端请求controller

@Controller
public class MessageCenterController {

    @Autowired
    private MessageService messageService;

    /**
     * 客户端进入页面或刷新页面调用。获取来源列表、每个来源未读消息数以及每个来源消息列表信息
     */
    @MessageMapping("/init/query")
    public void initQuery(String userId) {
        messageService.initQuery(userId);
    }
}

这里也可以使用http去实现,不过既然已经建立了websocket协议,就直接使用ws协议操作更为合适,无需建立另外的连接。

注:客户端请求"/init/query"地址,服务端是没有办法直接返回响应的,必须客户端订阅地址才能拿到服务端返回的信息。

/**
     * 根据用户id查询消息组合信息(应用列表、消息未读数以及消息列表)
     * @param userId
     * @return
     */
    @Override
    public void initQuery(String userId) {
        // 拼装result
        simpMessagingTemplate.convertAndSendToUser(userId, "/init/query", result);
    }

因为我在握手的时候将userId存放到websocket连接的session信息中了,因此通过Springboot提供的SimpMessagingTemplate.convertAndSendToUser(userId...)就能将消息发送到对应客户端。

 

三、客户端代码(js)

var stompClient = null;

//加载完浏览器后  调用connect(),打开双通道
$(function(){
    //打开双通道
    connect()
})

//打开双通道
function connect(){
    var socket = new SockJS(\'http://localhost:8281/message/center?userId=654366374251302912\'); //连接SockJS的endpoint名称为"endpointAric"
    stompClient = Stomp.over(socket);//使用STMOP子协议的WebSocket客户端
    stompClient.connect({},function(frame){//连接WebSocket服务端
        stompQueue();
    });
}

//列队(一对一)
function stompQueue(){
    //通过stompClient.subscribe订阅/user/queue/init/query
    stompClient.subscribe(\'/user/queue/init/query\',function(response){
        var message=JSON.stringify(response.body);
        //alert(message);
    });

    stompClient.send("/app/init/query",{},\'654366374251302912\');
}

//强制关闭浏览器  调用websocket.close(),进行正常关闭
window.onunload = function() {
    disconnect()
}
//关闭双通道
function disconnect(){
    if(stompClient != null) {
        stompClient.disconnect();
    }
    console.log("Disconnected");
}

 

分类:

技术点:

相关文章: