【问题标题】:Read asynchronously data from NetworkStream with huge amount of packets使用大量数据包从 NetworkStream 异步读取数据
【发布时间】:2017-03-07 11:46:00
【问题描述】:

在我的应用程序中,每个数据包在开始时都有 2 个字节的长度。但是,经过一段时间后,应用程序开始接收小于零的长度。在同步客户端中,一切正常,但速度太慢。我 100% 确定在 Server 中一切都是正确的。

连接:

    public void Connect(IPAddress ip, int port)
    {
        tcpClient.Connect(ip, port);
        stream = tcpClient.GetStream();
        byte[] len_buffer = new byte[2];
        stream.BeginRead(len_buffer, 0, len_buffer.Length, OnDataRead, len_buffer);
    }

OnDataRead:

    private void OnDataRead(IAsyncResult ar)
    {
            byte[] len = ar.AsyncState as byte[];
            int length = BitConverter.ToInt16(len, 0);
            byte[] buffer = new byte[length];

            int remaining = length;
            int pos = 0;
            while (remaining != 0)
            {
                int add = stream.Read(buffer, pos, remaining);
                pos += add;
                remaining -= add;
            }
            Process(buffer);
            len = new byte[2];

            stream.EndRead(ar);
            stream.BeginRead(len, 0, len.Length, OnDataRead, len);
    }

【问题讨论】:

  • 即使缓冲区长度仅为 2,也不能保证单次调用 Read(或此处为 BeginRead)会导致读取 2 个字节。如果您需要读取特定数量的字节,您总是需要检查结果并发出进一步的读取。
  • 您混淆了同步 stream.Read 和异步 stream.BeginRead。这在这里敲响了一些警钟......

标签: c# sockets asynchronous tcpclient


【解决方案1】:

如我所见,您混淆了同步和异步。这是一种不好的做法。

你想要的是这样的:

var header = ReadHeader(); // 2 bytes
var data = ReadData(header.DataSize);

我没有使用网络流,但是.... 这是我的 async SocketReader 的示例:

public static class SocketReader
{
    // This method will continues read until count bytes are read. (or socket is closed)
    private static void DoReadFromSocket(Socket socket, int bytesRead, int count, byte[] buffer, Action<ArraySegment<byte>> endRead)
    {
        // Start a BeginReceive.
        try
        {
            socket.BeginReceive(buffer, bytesRead, count - bytesRead, SocketFlags.None, (asyncResult) =>
            {
                // Get the bytes read.
                int read = 0;
                try
                {
                    // if this goes wrong, the read remains 0
                    read = socket.EndReceive(asyncResult);
                }
                catch (ObjectDisposedException) { }
                catch (Exception exception)
                {
                    Trace.TraceError(exception.Message);
                }


                // if zero bytes received, the socket isn't available anymore.
                if (read == 0)
                {
                    endRead(new ArraySegment<byte>(buffer, 0, 0));
                    return;
                }

                // increase the bytesRead, (position within the buffer)
                bytesRead += read;

                // if all bytes are read, call the endRead with the buffer.
                if (bytesRead == count)
                    // All bytes are read. Invoke callback.
                    endRead(new ArraySegment<byte>(buffer, 0, count));
                else
                    // if not all bytes received, start another BeginReceive.
                    DoReadFromSocket(socket, bytesRead, count, buffer, endRead);

            }, null);
        }
        catch (Exception exception)
        {
            Trace.TraceError(exception.Message);
            endRead(new ArraySegment<byte>(buffer, 0, 0));
        }
    }

    public static void ReadFromSocket(Socket socket, int count, Action<ArraySegment<byte>> endRead)
    {
        // read from socket, construct a new buffer.
        DoReadFromSocket(socket, 0, count, new byte[count], endRead);
    }

    public static void ReadFromSocket(Socket socket, int count, byte[] buffer, Action<ArraySegment<byte>> endRead)
    {
        // if you do have a buffer available, you can pass that one. (this way you do not construct new buffers for receiving and able to reuse buffers)

        // if the buffer is too small, raise an exception, the caller should check the count and size of the buffer.
        if (count > buffer.Length)
            throw new ArgumentOutOfRangeException(nameof(count));

        DoReadFromSocket(socket, 0, count, buffer, endRead);
    }
}

用法:

SocketReader.ReadFromSocket(socket, 2, (headerData) =>
{
    if(headerData.Count == 0)
    {
        // nothing/closed
        return;
    }

    // Read the length of the data.
    int length = BitConverter.ToInt16(headerData.Array, headerData.Offset);

    SocketReader.ReadFromSocket(socket, length, (dataBufferSegment) =>
    {
        if(dataBufferSegment.Count == 0)
        {
            // nothing/closed
            return;
        }

        Process(dataBufferSegment);

        // extra: if you need a binaryreader..
        using(var stream = new MemoryStream(dataBufferSegment.Array, dataBufferSegment.Offset, dataBufferSegment.Count))
        using(var reader = new BinaryReader(stream))
        {
            var whatever = reader.ReadInt32();
        }
    }
});

您可以通过传递缓冲区来优化接收缓冲区(查看重载)


继续接收: (重复使用接收缓冲区)

public class PacketReader
{
    private byte[] _receiveBuffer = new byte[2];

    // This will run until the socket is closed.    
    public void StartReceiving(Socket socket, Action<ArraySegment<byte>> process)
    {
        SocketReader.ReadFromSocket(socket, 2, _receiveBuffer, (headerData) =>
        {
            if(headerData.Count == 0)
            {
                // nothing/closed
                return;
            }

            // Read the length of the data.
            int length = BitConverter.ToInt16(headerData.Array, headerData.Offset);

            // if the receive buffer is too small, reallocate it.
            if(_receiveBuffer.Length < length)
                _receiveBuffer = new byte[length];

            SocketReader.ReadFromSocket(socket, length, _receiveBuffer, (dataBufferSegment) =>
            {
                if(dataBufferSegment.Count == 0)
                {
                    // nothing/closed
                    return;
                }

                try
                {
                    process(dataBufferSegment);
                }
                catch { }

                StartReceiving(socket, process);
            });
        }); 
    }
}

用法:

private PacketReader _reader;

public void Start()
{
    _reader = new PacketReader(socket, HandlePacket);
}

private void HandlePacket(ArraySegment<byte> packet)
{
    // do stuff.....
}

【讨论】:

  • 谢谢,我一定会用你的课!问题在这里int length = BitConverter.ToInt16(len, 0);。应该是short length
  • short 存储到int 中没有问题。 (原始代码是一个 int 作为 datasize 字段)
  • 对不起,我错误地使用了同步客户端,我将它们分为两个类。我要试试你的代码。
  • 是的,您的代码可以工作,但我必须修改它才能工作直到停止,因为服务器发送的数据包数量未定义。
  • 感谢您的代码。最后我通过使用 ushort 解决了这个问题,但是我在我的应用程序中使用了你的代码,因为它就像一个魅力。 ;-)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-12-29
  • 2015-04-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多