【问题标题】:Sockets in C#, how can I asynchronously read and write data through a NetworkStreamC# 中的套接字,如何通过 NetworkStream 异步读写数据
【发布时间】:2014-11-13 19:49:32
【问题描述】:

[我仅限于 Visual Studio 2010,因此我无法使用 C# 4.async 和 await。]

我正在为我的一个项目研究网络架构,该项目通过网络在服务器和客户端之间发送数据包,但客户端和服务器必须在等待时继续运行,因此代码必须是非阻塞,所以我想使用异步方法。但是,除了简单的同步一次性 IO 外,我不太了解该怎么做,尤其是在使用 NetworkStream 时。我想做的是:

1) 客户端连接到服务器

2) 服务器接受连接

3) 服务器等待来自客户端的数据

4) 服务器处理数据

5) 服务器响应客户端

6) 当连接打开时,从 3 开始重复。

我想使用 NetworkStream 来包装套接字。但是我是异步 I/O 的新手,我不确定如何在等待响应时不阻塞服务器/客户端代码的其他部分,尤其是使用 NetworkStream 时如何做到这一点。在我的研究中,我看到使用类似这样的示例:

while(true){
    socket.BeginAccept(new AsyncCallback(AcceptCallback), socket );
}

但似乎该循环仍会阻止应用程序。谁能给我一些关于如何做到这一点的指示(哈)?我找不到很多保持连接打开的示例,只有客户端连接->客户端发送->服务器接收->服务器发送->断开连接。我不是要完整的代码,只是需要一些 sn-ps 的一般概念。

【问题讨论】:

  • NetworkStream.BeginRead Method你检查过这个吗..?
  • @DJKRAZE 是的,我有。我知道我需要使用异步结果,这样我就可以在不阻塞的情况下处理信息。但我不知道如何在不阻塞的情况下等待信息。
  • 您需要查找Task await,因为您正在执行此异步操作,您是否也可以显示完整的方法签名...
  • @DJKRAZE 你的意思是 await 关键字?恐怕我仅限于 C# 4。没有异步或等待。
  • 这是 Winforms 吗?如果是这样,为无限客户端接受循环创建一个线程,并为每个新连接创建一个新线程。

标签: c# sockets asynchronous nonblocking


【解决方案1】:

这是一个非常简单的使用 async/await 和 NetworkStream 的示例:

SocketServer.cs:

class SocketServer
{
    private readonly Socket _listen;

    public SocketServer(int port)
    {
        IPEndPoint listenEndPoint = new IPEndPoint(IPAddress.Loopback, port);
        _listen = new Socket(SocketType.Stream, ProtocolType.Tcp);
        _listen.Bind(listenEndPoint);
        _listen.Listen(1);
        _listen.BeginAccept(_Accept, null);
    }

    public void Stop()
    {
        _listen.Close();
    }

    private async void _Accept(IAsyncResult result)
    {
        try
        {
            using (Socket client = _listen.EndAccept(result))
            using (NetworkStream stream = new NetworkStream(client))
            using (StreamReader reader = new StreamReader(stream))
            using (StreamWriter writer = new StreamWriter(stream))
            {
                Console.WriteLine("SERVER: accepted new client");

                string text;

                while ((text = await reader.ReadLineAsync()) != null)
                {
                    Console.WriteLine("SERVER: received \"" + text + "\"");
                    writer.WriteLine(text);
                    writer.Flush();
                }
            }

            Console.WriteLine("SERVER: end-of-stream");

            // Don't accept a new client until the previous one is done
            _listen.BeginAccept(_Accept, null);
        }
        catch (ObjectDisposedException)
        {
            Console.WriteLine("SERVER: server was closed");
        }
        catch (SocketException e)
        {
            Console.WriteLine("SERVER: Exception: " + e);
        }
    }
}

程序.cs:

class Program
{
    private const int _kport = 54321;

    static void Main(string[] args)
    {
        SocketServer server = new SocketServer(_kport);
        Socket remote = new Socket(SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Loopback, _kport);

        remote.Connect(remoteEndPoint);

        using (NetworkStream stream = new NetworkStream(remote))
        using (StreamReader reader = new StreamReader(stream))
        using (StreamWriter writer = new StreamWriter(stream))
        {
            Task receiveTask = _Receive(reader);
            string text;

            Console.WriteLine("CLIENT: connected. Enter text to send...");

            while ((text = Console.ReadLine()) != "")
            {
                writer.WriteLine(text);
                writer.Flush();
            }

            remote.Shutdown(SocketShutdown.Send);
            receiveTask.Wait();
        }

        server.Stop();
    }

    private static async Task _Receive(StreamReader reader)
    {
        string receiveText;

        while ((receiveText = await reader.ReadLineAsync()) != null)
        {
            Console.WriteLine("CLIENT: received \"" + receiveText + "\"");
        }

        Console.WriteLine("CLIENT: end-of-stream");
    }
}

这是一个非常简单的示例,将服务器和客户端托管在同一个进程中,并且一次只接受一个连接。这实际上只是为了说明目的。真实世界的场景无疑会包含其他功能以满足他们的需求。

在这里,我将NetworkStreams 包装在StreamReaders 和StreamWriters 中。请注意,您必须致电Flush() 以确保实际发送数据。为了更好地控制 I/O,当然可以直接使用NetworkStream。只需使用Stream.ReadAsync() 方法而不是StreamReader.ReadLineAsync()。另请注意,在我的示例中,写入是同步的。如果您愿意,您也可以使用与阅读所示相同的基本技术将其设为异步。

编辑:

OP 表示他们无法使用async/await。这是使用NetworkStream 和旧式Begin/EndXXX() API 的客户端版本(当然,服务器也会进行类似的更改):

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace TestOldSchoolNetworkStream
{
    class Program
    {
        private const int _kport = 54321;

        static void Main(string[] args)
        {
            SocketServer server = new SocketServer(_kport);
            Socket remote = new Socket(SocketType.Stream, ProtocolType.Tcp);
            IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Loopback, _kport);

            remote.Connect(remoteEndPoint);

            using (NetworkStream stream = new NetworkStream(remote))
            {
                // For convenience, These variables are local and captured by the
                // anonymous method callback. A less-primitive implementation would
                // encapsulate the client state in a separate class, where these objects
                // would be kept. The instance of this object would be then passed to the
                // completion callback, or the receive method itself would contain the
                // completion callback itself.
                ManualResetEvent receiveMonitor = new ManualResetEvent(false);
                byte[] rgbReceive = new byte[8192];
                char[] rgch = new char[Encoding.UTF8.GetMaxCharCount(rgbReceive.Length)];
                Decoder decoder = Encoding.UTF8.GetDecoder();
                StringBuilder receiveBuffer = new StringBuilder();

                stream.BeginRead(rgbReceive, 0, rgbReceive.Length, result =>
                {
                    _Receive(stream, rgbReceive, rgch, decoder, receiveBuffer, receiveMonitor, result);
                }, null);

                string text;

                Console.WriteLine("CLIENT: connected. Enter text to send...");

                while ((text = Console.ReadLine()) != "")
                {
                    byte[] rgbSend = Encoding.UTF8.GetBytes(text + Environment.NewLine);

                    remote.BeginSend(rgbSend, 0, rgbSend.Length, SocketFlags.None, _Send, Tuple.Create(remote, rgbSend.Length));
                }

                remote.Shutdown(SocketShutdown.Send);
                receiveMonitor.WaitOne();
            }

            server.Stop();
        }

        private static void _Receive(NetworkStream stream, byte[] rgb, char[] rgch, Decoder decoder, StringBuilder receiveBuffer, EventWaitHandle monitor, IAsyncResult result)
        {
            try
            {
                int byteCount = stream.EndRead(result);
                string fullLine = null;

                if (byteCount > 0)
                {
                    int charCount = decoder.GetChars(rgb, 0, byteCount, rgch, 0);

                    receiveBuffer.Append(rgch, 0, charCount);

                    int newLineIndex = IndexOf(receiveBuffer, Environment.NewLine);

                    if (newLineIndex >= 0)
                    {
                        fullLine = receiveBuffer.ToString(0, newLineIndex);
                        receiveBuffer.Remove(0, newLineIndex + Environment.NewLine.Length);
                    }

                    stream.BeginRead(rgb, 0, rgb.Length, result1 =>
                    {
                        _Receive(stream, rgb, rgch, decoder, receiveBuffer, monitor, result1);
                    }, null);
                }
                else
                {
                    Console.WriteLine("CLIENT: end-of-stream");
                    fullLine = receiveBuffer.ToString();
                    monitor.Set();
                }

                if (!string.IsNullOrEmpty(fullLine))
                {
                    Console.WriteLine("CLIENT: received \"" + fullLine + "\"");
                }
            }
            catch (IOException e)
            {
                Console.WriteLine("CLIENT: Exception: " + e);
            }
        }

        private static int IndexOf(StringBuilder sb, string text)
        {
            for (int i = 0; i < sb.Length - text.Length + 1; i++)
            {
                bool match = true;

                for (int j = 0; j < text.Length; j++)
                {
                    if (sb[i + j] != text[j])
                    {
                        match = false;
                        break;
                    }
                }

                if (match)
                {
                    return i;
                }
            }

            return -1;
        }

        private static void _Send(IAsyncResult result)
        {
            try
            {
                Tuple<Socket, int> state = (Tuple<Socket, int>)result.AsyncState;
                int actualLength = state.Item1.EndSend(result);

                if (state.Item2 != actualLength)
                {
                    // Should never happen...the async operation should not complete until
                    // the full buffer has been successfully sent, 
                    Console.WriteLine("CLIENT: send completed with only partial success");
                }
            }
            catch (IOException e)
            {
                Console.WriteLine("CLIENT: Exception: " + e);
            }
        }
    }
}

请注意,即使省略了一堆异常处理逻辑,这段代码也相当长,至少部分原因是TextReader 没有内置的异步 API,因此处理这里的输入数据要详细得多。当然,这是针对简单的基于行的文本交换协议。其他协议在数据解包方面可能或多或少复杂,但NetworkStream 的底层读写元素是相同的。

【讨论】:

  • 尽管这个答案很好,但我仅限于 VS2010,因此仅限于 C# 4。所以我不能使用异步或等待。我会将此添加到我的问题中。
  • 它只服务一个连接吗?
  • 是的...这个例子一次只允许一个连接。但是,这只是为了使示例更简单。更改此示例以支持多个连接很容易:只需将SocketServer._Accept() 中的BeginAccept() 调用移动到从当前套接字读取的循环之前。然后它将允许同时进行多个连接。
  • @AlphaMCubed:注意前面的评论将您引导至一个链接,该链接显示如何在 VS2010/.NET 4 中启用 async/await。也就是说,如果这仍然不起作用你,我已经在答案中添加了一个客户端版本。我不认为这与您可能会发现的其他示例有很大不同,但我希望它仍然有用。如果您遇到特定实施问题,请随时发布新问题。
  • @PeterDuniho 谢谢,我没看到。我现在就试试。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-10-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-24
  • 2016-12-17
相关资源
最近更新 更多