【问题标题】:Spring Websocket in a tomcat clusterTomcat集群中的Spring Websocket
【发布时间】:2023-03-12 13:04:01
【问题描述】:

在我们当前的应用程序中,我们使用 Spring Websockets 而不是 STOMP。我们正在寻求水平扩展。关于我们应该如何处理多个 tomcat 实例上的 websocket 流量以及我们如何跨多个节点维护会话信息,是否有任何最佳实践。是否有可以参考的工作示例?

【问题讨论】:

    标签: spring spring-mvc spring-websocket


    【解决方案1】:

    您的需求可以分为 2 个子任务:

    1. 跨多个节点维护会话信息:您可以尝试由 Redis 支持的 Spring Sessions 集群(请参阅:HttpSession with Redis)。这非常非常简单,并且已经支持 Spring Websockets(参见:Spring Session & WebSockets)。

    2. 通过多个 tomcat 实例处理 websockets 流量:有几种方法可以做到这一点。

      • 第一种方式:使用全功能代理(例如:ActiveMQ)并尝试新功能Support multiple WebSocket servers(来自:4.2.0 RC1)
      • 第二种方式:使用全功能代理并实现分布式UserSessionRegistry(例如:使用Redis :D)。默认实现DefaultUserSessionRegistry 使用内存存储。

    更新:我用Redis写了一个简单的实现,有兴趣可以试试

    要配置一个全功能的代理(broker relay),可以尝试:

    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
        ...
    
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry config) {
            config.enableStompBrokerRelay("/topic", "/queue")
                .setRelayHost("localhost") // broker host
                .setRelayPort(61613) // broker port
                ;
            config.setApplicationDestinationPrefixes("/app");
        }
    
        @Bean
        public UserSessionRegistry userSessionRegistry() {
            return new RedisUserSessionRegistry(redisConnectionFactory);
        }
    
        ...
    }
    

    import java.util.Set;
    
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.BoundHashOperations;
    import org.springframework.data.redis.core.BoundSetOperations;
    import org.springframework.data.redis.core.RedisOperations;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import org.springframework.messaging.simp.user.UserSessionRegistry;
    import org.springframework.util.Assert;
    
    /**
     * An implementation of {@link UserSessionRegistry} backed by Redis.
     * @author thanh
     */
    public class RedisUserSessionRegistry implements UserSessionRegistry {
    
        /**
         * The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
         */
        static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";
    
        private final RedisOperations<String, String> sessionRedisOperations;
    
        @SuppressWarnings("unchecked")
        public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
            this(createDefaultTemplate(redisConnectionFactory));
        }
    
        public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
            Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
            this.sessionRedisOperations = sessionRedisOperations;
        }
    
        @Override
        public Set<String> getSessionIds(String user) {
            Set<String> entries = getSessionBoundHashOperations(user).members();
            return (entries != null) ? entries : Collections.<String>emptySet();
        }
    
        @Override
        public void registerSessionId(String user, String sessionId) {
            getSessionBoundHashOperations(user).add(sessionId);
        }
    
        @Override
        public void unregisterSessionId(String user, String sessionId) {
            getSessionBoundHashOperations(user).remove(sessionId);
        }
    
        /**
         * Gets the {@link BoundHashOperations} to operate on a username
         */
        private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
            String key = getKey(username);
            return this.sessionRedisOperations.boundSetOps(key);
        }
    
        /**
         * Gets the Hash key for this user by prefixing it appropriately.
         */
        static String getKey(String username) {
            return BOUNDED_HASH_KEY_PREFIX + username;
        }
    
        @SuppressWarnings("rawtypes")
        private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
            Assert.notNull(connectionFactory, "connectionFactory cannot be null");
            StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
            template.setKeySerializer(new StringRedisSerializer());
            template.setValueSerializer(new StringRedisSerializer());
            template.afterPropertiesSet();
            return template;
        }
    
    }
    

    【讨论】:

    • 看起来 UserSessionRegistry 已被弃用,最新的是 SimpUserRegistry。方法和实现方式完全不同?您能了解如何自定义该类吗?
    【解决方案2】:

    水平扩展 WebSockets 实际上与水平扩展无状态/有状态的仅基于 HTTP 的应用程序非常不同。

    水平扩展无状态 HTTP 应用程序:只需在不同机器上启动一些应用程序实例,然后在它们前面放置一个负载均衡器。有很多不同的负载均衡器解决方案,例如 HAProxy、Nginx 等。如果您在 AWS 等云环境中,您还可以使用 Elastic Load Balancer 等托管解决方案。

    水平扩展有状态 HTTP 应用程序:如果我们可以让所有应用程序每次都是无状态的,那就太好了,但不幸的是,这并不总是可行的。因此,在处理有状态 HTTP 应用程序时,您必须关心 HTTP 会话,它基本上是每个不同客户端的 本地 存储,Web 服务器可以在其中存储跨不同 HTTP 请求保存的数据(例如在处理购物车时)。那么,在这种情况下,当水平扩展时,您应该知道,正如我所说,它是一个 LOCAL 存储,因此 ServerA 将无法处理 ServerB 上的 HTTP 会话。换句话说,如果出于某种原因,正在由 ServerA 提供服务的 Client1 突然开始由 ServerB 提供服务,他的 HTTP 会话将丢失(并且他的购物车将消失!)。原因可能是节点故障,甚至是部署。 为了解决这个问题,您不能只在本地保留 HTTP 会话,也就是说,您必须将它们存储在另一个外部组件中。那是几个能够处理这个问题的组件,例如任何关系数据库,但这实际上是一种开销。一些 NoSQL 数据库可以很好地处理这种键值行为,例如 Redis。 现在,将 HTTP 会话存储在 Redis 上,如果客户端开始由另一台服务器提供服务,它将从 Redis 获取客户端的 HTTP 会话并将其加载到其内存中,因此一切将继续工作,用户不会丢失他的HTTP会话了。 您可以使用 Spring Session 轻松地将 HTTP 会话存储在 Redis 上。

    水平扩展 WebSocket 应用程序:当建立 WebSocket 连接时,服务器必须保持与客户端的连接打开,以便它们可以双向交换数据。当客户端正在收听诸如“/topic/public.messages”之类的目的地时,我们说客户端订阅了该目的地。在 Spring 中,当您使用 simpleBroker 方法时,订阅会保存在 内存中,例如,如果 Client1 正在由 ServerA 提供服务并希望使用 WebSocket 向正在服务的 Client2 发送消息,会发生什么情况由服务器B?你已经知道答案了!消息不会传递给 Client2,因为 Server1 甚至不知道 Client2 的订阅。 因此,为了解决这个问题,您必须再次将 WebSockets 订阅外部化。当您使用 STOMP 作为子协议时,您需要一个可以充当外部 STOMP 代理的外部组件。有很多工具可以做到这一点,但我建议使用 RabbitMQ。 现在,您必须更改您的 Spring 配置,以便它不会保留订阅in-memory。相反,它将订阅委托给外部 STOMP 代理。您可以通过enableStompBrokerRelay 等一些基本配置轻松实现此目的。 需要注意的重要一点是 HTTP 会话不同于 WebSocket 会话使用 Spring Session 将 HTTP 会话存储在 Redis 中与水平扩展 WebSockets 完全无关

    我已经用 Spring Boot(以及更多)编写了一个完整的 Web 聊天应用程序,它使用 RabbitMQ 作为一个完整的外部 STOMP 代理,它是 public on GitHub 所以请克隆它,在你的机器上运行应用程序并查看代码详细信息.

    当涉及到 WebSocket 连接丢失时,Spring 无能为力。实际上,重新连接必须由实现重新连接回调函数的客户端请求,例如(即 WebSocket 握手流程,客户端必须启动握手,而不是服务器)。有一些客户端库可以为您透明地处理这个问题。这不是 SockJS 的情况。在聊天应用程序中,我还实现了这个重新连接功能。

    【讨论】:

      【解决方案3】:

      跨多个节点维护会话信息:

      假设我们有 2 个服务器主机,由负载均衡器备份。

      Websockets 是从浏览器到特定服务器 host.eg host1 的套接字连接

      现在,如果主机 1 出现故障,来自负载平衡器的套接字连接 - 主机 1 将中断。 spring 如何重新打开从负载均衡器到主机 2 的相同 websocket 连接?浏览器不应打开新的 websocket 连接

      【讨论】:

      • 但是,这种情况可以通过前端重新连接到相同的 url 来处理,并且 LB 应该将请求定向到活动服务器。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-03-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多