【问题标题】:Spring websocket send message from multiple threadsSpring websocket从多个线程发送消息
【发布时间】:2018-07-14 10:28:47
【问题描述】:

我正在为我的一个基于 Spring 的项目使用 Spring WebSocket 服务器实现。我遇到了一个错误,说The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state。我发现问题是同时从不同的线程写入 websocket。

我如何临时修复它:考虑我已经实现了以下方法

void sendMessageToSession(WebsocketSession session,String message);

将 TextMessage 发送到 websocket 会话。我不能使整个方法同步,因为多个线程可以为不同的 websocketSessions 和消息调用它。我也不能把会话放在同步块中(试过但没用)

虽然,我这样解决了我的问题

synchronized(session.getId()){ 
    //sending message;
}

我不再面临这个问题。但在同步块中使用字符串似乎不是一个好习惯。 那么我还有什么其他解决方案?发送异步消息的最佳方式是什么?

PS:建立连接后我已经使用ConcurrentWebSocketSessionDecorator,并且我正在使用更新的websocket。没有帮助。

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

注意 我将我的 websocet 会话保存在地图中,其中键是 session.getId,值是会话本身。

与其他一些 websocket 实现不同,Spring websocket 引用在每条消息上似乎并不相同。我通过他们的 ID 将会话保存在地图中,并在每条消息上检查传递的 websocket 与我已经放在地图上的 websocket 的相等性,它是错误的。

【问题讨论】:

  • 据我所知,synchronized(session.getId()) 无法解决您的问题..
  • @user27149 好吧,我现在没有遇到任何异常,因为我正在使用它并且系统运行良好,所以我可以说它确实解决了我的问题(暂时是因为我问这个问题是为了找到合适的解决方法)
  • 是的,我知道您正在寻求更好的解决方案...
  • 当您尝试同步(会话)时会发生什么?
  • @WarrenDew 仍然是同样的错误。另请参阅我的笔记

标签: java spring multithreading spring-websocket


【解决方案1】:

通过在我坚持会话的WebsocketSession 后面添加volatile 关键字,我解决了问题。我很高兴知道这是否也是一种不好的做法。但我的想法是,当从多个线程写入 websocket 会话时,这些线程会丢失 websocket 的状态,因为它尚未更新,这就是引发此异常的原因。

通过添加 volatile,我们确保 websocket 状态在另一个线程使用它之前已更新,因此写入 websocket 可以按预期同步工作。

我创建了一个名为 SessionData 的类,它包含 websocketSession 和我需要的有关会话的所有其他数据。

public class SessionData {
    private volatile WebSocketSession websocketSession;
    //...other 
    // getters and setters ...
}

我使用 SessionData 作为映射的值,其中会话 ID 是键

然后当从 SessionData 获取 websocketSession 并从不同的线程写入它时,volatile 帮助我获取更新的 websocketSession。


更新(2020 年)

这里的一个重要注意事项是,每次要向会话发送消息时都应该使用sessionData.getWebsocketSession.sendMessage(...)。你不应该直接使用会话,这意味着像 this 这样的代码是一个不好的做法

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);

您永远不会知道这两行代码之间的 websocket 会话发生了什么变化(在您的情况下可能超过 2 行)。

这样的代码更好:

sessionData.getWebSocketSession().sendMessage(...);

也永远不要直接发布到在 Spring websocket MessageHandlers 中传递给您的会话中。否则您可能会再次遇到该错误。

这就是为什么在连接打开时将WebSocketSessionsessionId 映射到SessionData 的良好做法。您可以使用此存储库使用会话 ID 获取 volatile session,而不是直接使用会话。

【讨论】:

  • 哇,谢谢!这就是我一直在寻找的。标记volatile 整个地图(其中会话ID 是键)和标记volatile 地图值内的会话对象(SessionData 类)之间是否存在显着差异?
  • @NikolayShevchenko 我建议你把它放在 sessionData 类的会话后面。只是考虑这个概念,我应该说这意味着我们正在访问从 map 中获取 websocketSession 的最后一个状态。但是如果你把它放在地图后面,你正在访问地图的最后状态,这不能保证地图内的对象的任何事情。只要确保您使用的是ConcurrentHashMap
  • @SepGH “也永远不要直接发布到在 Spring websocket MessageHandlers 中传递给您的会话中。否则您可能会再次遇到该错误”。如果您能详细说明,将非常有帮助。
  • @JaykshatriyaShaktawat 好吧,整个想法是,您在 SessionData 中编写的 websocket 会话的 getter 正在将 volatile session 传递给您,但是如果您引入一个新变量来保存该引用将不再是volatile。因此,与其从 getter 中创建具有值的新变量,不如在获得会话后直接调用所需的方法可能是一个更好的主意。您可以想象,如果您的应用程序在多个线程上运行,如果您使用新变量,将无法保证会话同步,
  • 换句话说,这是好的:sessionData.getWebSocketSession().sendMessage(...),这是不好的:WebSocketSession websocketSession = sessionData.getWebSocketSession(); websocketSession.sendMessage()
【解决方案2】:

ConcurrentWebSocketSessionDecorator 就像多线程中的魅力一样,它是为它而设计的。 您的地图实现可能有问题。

示例代码:

private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
    // Use the following will crash :
    //sessions.put(session.getId(), new SessionData(session));

    // Use ConcurrentWebSocketSessionDecorator is safe :
    sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
    super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
    sessions.remove(session.getId());
    super.afterConnectionClosed(session, status); 
}

public void send(WebSocketSession session, String msg) throws MessagingException {
    try {
        session.sendMessage (new TextMessage(msg));
    } catch (IOException ex) {
        throw new MessagingException(ex.getMessage());
    }
}

轻松测试多线程中的行为:

    public void sendMT(WebSocketSession session, String msg) throws MessagingException{
    for (int i=0; i<3; i++){
        new Thread(){
          @Override
          public void run(){
              send (session, msg);
        }.start();  
    }
}

【讨论】:

  • 这是正确答案...上面接受的“update 2020”答案不是线程安全的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-07-02
相关资源
最近更新 更多