【问题标题】:A websocket's ReceiveAsync method does not await the entire messagewebsocket 的 ReceiveAsync 方法不会等待整个消息
【发布时间】:2014-07-09 12:26:24
【问题描述】:

我通过 websocket 接收 JSON。至少:我是部分的。使用在线 websocket 服务,我收到完整的 JSON 响应(所有 HTML 标记都被忽略)。当我查看在控制台中收到的 JSON 时,我可以看到 HTML 标记(在调试期间使用 HTML 查看器查看它会删除 HTML),但它会突然结束(数据不完整)。

我的缓冲区有足够的空间,我正在使用 async-await 来(据说)等待整个响应进入,然后再继续。

private async Task Receive()
{
  var buffer = new byte[4096 * 20];

  while (_socket.State == WebSocketState.Open)
  {
      var response = await _socket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

      if (response.MessageType == WebSocketMessageType.Close)
      {
          await
              _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close response received",
                  CancellationToken.None);
      }
      else
      {
          var result = Encoding.UTF8.GetString(buffer);
          var a = buffer[1000];
          var b = buffer[10000];
          var c = buffer[50000];
          var d = buffer[81000];
          Console.WriteLine(result);
          var responseObject = JsonConvert.DeserializeObject<Response>(result, _requestParameters.ResponseDataType);

          OnSocketReceive.Invoke(this, new SocketEventArgs {Response = responseObject });
          buffer = new byte[4096 * 20];
      }
  }
}   

注意事项:缓冲区足够大,bcd 永远不会被填满。我还应该注意,这只发生在 1-questions-newest-tag-java 请求中,155-questions-active 工作得很好。

在做了一些挖掘之后,我发现response.CloseStatusresponse.CloseStatusDescription 总是nullresponse.Count 总是1396(在 Word 中复制粘贴结果确实显示总是有 1396 个字符)和response.EndOfMessagefalse

挖掘some source code我发现DefaultReceiveBufferSize16 * 1024(足够大),WebSocketGetDefaultKeepAliveInterval()指的是an external implementation(但调试器显示00:00:30)。

这不是超时问题,因为调试器在在线服务收到响应的同时停止。

为什么套接字还没有收到所有数据时我的方法继续执行?

【问题讨论】:

    标签: c# websocket async-await


    【解决方案1】:

    试试这个:

    try
    {
        WebSocketReceiveResult result;
        string receivedMessage = "";
        var message = new ArraySegment<byte>(new byte[4096]);
        do
        {
            result = await WebSocket.ReceiveAsync(message, DisconectToken);
            if (result.MessageType != WebSocketMessageType.Text)
                break;
            var messageBytes = message.Skip(message.Offset).Take(result.Count).ToArray();
            receivedMessage += Encoding.UTF8.GetString(messageBytes);                    
        }
        while (!result.EndOfMessage);
        if (receivedMessage != "{}" && !string.IsNullOrEmpty(receivedMessage))
        {
            ResolveWebSocketResponse.Invoke(receivedMessage, Connection);
            Console.WriteLine("Received: {0}", receivedMessage);
        }
    }
    catch (Exception ex)
    {
        var mes = ex.Message;
    }
    

    【讨论】:

    • 我不确定当 UTF-8 序列跨越消息边界时这种方法是否会中断 - AFAICS,UTF8.GetString() 将失败并出现解码器错误,因为它只有部分序列.
    • 补充:RFC 6455 明确指出:“请注意,特定文本框架可能包含部分 UTF-8 序列”。并且双方可能不依赖于特定的分片,因为中间人可能会重新分片消息。
    【解决方案2】:
    // Read the bytes from the web socket and accumulate all into a list.
    var buffer = new ArraySegment<byte>(new byte[1024]);
    WebSocketReceiveResult result = null;
    var allBytes = new List<byte>();
    
    do
    {
        result = await webSocket.ReceiveAsync(buffer, CancellationToken.None);
        for (int i = 0; i < result.Count; i++)
        {
            allBytes.Add(buffer.Array[i]);
        }
    }
    while (!result.EndOfMessage);
    
    // Optional step to convert to a string (UTF-8 encoding).
    var text = Encoding.UTF8.GetString(allBytes.ToArray(), 0, allBytes.Count);
    

    【讨论】:

      【解决方案3】:

      按照 Noseratio 的回答,我实现了一个临时缓冲区,它将构造整个消息的数据。

      var temporaryBuffer = new byte[BufferSize];
      var buffer = new byte[BufferSize * 20];
      int offset = 0;
      WebSocketReceiveResult response;
      
      while (true)
      {
          response = await _socket.ReceiveAsync(
                               new ArraySegment<byte>(temporaryBuffer), 
                               CancellationToken.None);
          temporaryBuffer.CopyTo(buffer, offset);
          offset += response.Count;
          temporaryBuffer = new byte[BufferSize];
          if (response.EndOfMessage)
          {
              break;
          }
      }
      

      完全实现here

      【讨论】:

      • @vtortola 的答案是安全的,因为他在 MemoryStream 中写入(复制)内容,这些内容会根据需要增长。在您的回答中,最大大小为BufferSize * 20,因此如果超出,您会遇到一些例外情况。如果你知道你永远不会超过BufferSize * 20,你应该总是在buffer 周围创建一个ArraySegment,并指定它从哪里开始。这样你就可以避免CopyTonew byte[BufferSize]while 的每个步骤中。
      • * 20 的原因是什么,而不是让 BufferSize 开始变大?
      • 另外,我查看了完整的实现,看起来它的设计目的是连接、发送、接收和断开连接,但我认为套接字的关键用途之一是保持连接,所以会该实现对性能有影响吗?
      【解决方案4】:

      只是为了完成@Noseratio 响应,代码应该是这样的:

      ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);
      
      WebSocketReceiveResult result= null;
      
      using (var ms = new MemoryStream())
      {
           do
           {
               result = await socket.ReceiveAsync(buffer, CancellationToken.None);
               ms.Write(buffer.Array, buffer.Offset, result.Count);
           }
           while (!result.EndOfMessage);
      
           ms.Seek(0, SeekOrigin.Begin);
      
           if (result.MessageType == WebSocketMessageType.Text)
           {
                using (var reader = new StreamReader(ms, Encoding.UTF8))
                {
                     // do stuff
                }
           }
      }
      

      干杯。

      【讨论】:

      • 感谢您花时间写这篇文章。在我的方法中使用流而不是临时数组是否有优势,或者它们是平等的选择吗?
      • A MemoryStream 在内部是一个 Byte[],可能几乎没有技术优势,但只要我期待具体的消息,我总是使用它们来方便。
      • 注意:应该考虑创建接收数据大小阈值,因为远程端可能会导致内存溢出。要处理这个问题,需要在循环中添加 if (MaxReceivedMessageSize
      【解决方案5】:

      我可能是错的,但我认为您不应该总是立即收到完整的WebSocket 消息。服务器可能会以块的形式发送消息(这对应于使用endOfMessage: false 调用SendAsync)。

      所以,循环执行await _socket.ReceiveAsync() 并累积接收到的块,直到WebSocketReceiveResult.EndOfMessagetrue 或发生错误。

      顺便说一句,您可能应该使用WebSocket.CreateClientBuffer 而不是new ArraySegment&lt;byte&gt;(buffer)

      【讨论】:

      • 使用循环构造消息确实有效。因此,ReceiveAsync 在发送整个块而不是收到整个消息时认为自己“正在等待”,感谢您澄清这一点。至于你的旁注:这样做有什么好处?
      • @JeroenVannevel,关于CreateClientBuffer - 这只是我的直觉,因为 API 就在那里。自己检查参考源中的here,是否是这种情况。
      • 只是补充一下,即使服务器确实发送了一个块(即endOfMessage = true),我发现ClientWebSocket可以接收部分消息,特别是如果它是一个大消息通过网络套接字的内部接收缓冲区大小(默认情况下似乎约为 16k)。因此,按照您的建议,使用循环从块中构建完整消息似乎是必要的。
      • @Noseratio 这似乎是一个内部方法或其他东西 - 查看参考源,如果您想要一个明确大小的缓冲区,它似乎没有那么有用。
      • @rookie1024,哪一个? WebSocket.CreateClientBuffer 当然不是。
      猜你喜欢
      • 2022-11-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-07-02
      相关资源
      最近更新 更多