【问题标题】:How to implement an Asynchronous TCP Client如何实现异步 TCP 客户端
【发布时间】:2019-09-13 15:46:03
【问题描述】:

以下情况:
我使用 TCP/IP 与设备进行远程通信。我向设备发送命令。设备执行命令并将结果消息发送给我asynchronously。消息之间的时间可以在很大范围内变化。我不想要一个等待所有消息都收到的阻塞功能。所以我正在使用callback functions,所以只有在收到消息时才会调用该函数。
在下一步中,我将这些收到的消息附加到一个字符串值。在后台,我想process the messages 并从字符串值中删除处理后的消息。

现在我的问题:
1. 我是新手,所以我想先请教有经验的人了解一下实施。
2. 处理响应消息的最佳时机在哪里?
3. 我是否必须使用 Mutex,因为我可能会写入和读取响应?

回复
有两种不同类型的响应:标准响应和通知。在处理消息时,我必须区分这些类型。
XML 响应消息对应如下:

<?xml version="1.0" encoding="utf8" standalone="yes" ?>
<Telegram xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchemainstance" xmlns="LancePlatform">
    Response or Notification
</Telegram>

仅供参考: XML 响应总是以

开头
<?xml version="1.0" encoding="utf8 standalone="yes" ?>

并以 CR+LF 序列结束

我的代码:

public class StateObject
{
    // Client socket.  
    public TcpClient workSocket = null;
    // Size of receive buffer.  
    public const int BufferSize = 4096;
    // Receive buffer.  
    public byte[] buffer = new byte[BufferSize];
    // Received data string.  
    public StringBuilder sb = new StringBuilder();
}

public class Client
{
    private StateObject state;
    private String response { get; set; }

    public void Connect(string host, Int32 port)
    {
        try
        {
            client = new StateObject();
            client.workSocket = new TcpClient(host, port);
        }
        // SocketException
    }

    public void BeginRead()
    {
        NetworkStream ns = state.workSocket.GetStream();
        ns.BeginRead(state.buffer, 0, StateObject.BufferSize, EndRead, state);
    }

    private void EndRead(IAsyncResult AR)
    {
        try
        {
            state  = (StateObject)AR.AsyncState;

            NetworkStream ns = state.workSocket.GetStream();

            // Get the number of received bytes
            int receivedBytes = ns.EndRead(AR);

            // Append the message to the string value.
            state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, receivedBytes));

            // Get the number of remaining bytes.
            int remainingBytes = state.workSocket.Available;

            // More data available
            if (remainingBytes != 0)
            {
                ns.BeginRead(state.buffer, 0, StateObject.BufferSize, EndRead, state);
            } else 
            {
                // Should I process the messages here or somewhere else, 
                // so I don't block this function with processing messages?
                ProcessMessages();
            }
        }
        // SocketException
    }

    public void ProcessMessages()
    {
        // Split the message by CR+LF. Because each message ends with that.
        string[] messages = Regex.Split(response, "\r\n");
        foreach (string message in messages)
        {
            // Deserialize XML String to Object.
            // Remove message string from response string.
        }
    }
}

static class Program
{
    static void Main()
    {
        // ...
        Client client = new Client();
        client.Connect("127.0.0.1", 8888);
        client.BeginRead();
        // ...
    }
}

【问题讨论】:

  • 查看 msdn 示例。示例使用 Socket,但您可以用继承套接字的任何类替换套接字,如 TCPClient 或 TCPListener :docs.microsoft.com/en-us/dotnet/framework/network-programming/…
  • 我不确定这是否可能,因为 TcpClient 例如没有方法 Receive, ReceiveCallback, Send,... 在示例中使用你发送的。
  • 是的。套接字属性位于客户端属性下。
  • 你是对的!谢谢!那么另外两个问题呢?你有什么建议吗?
  • 大多数程序员对 7 个网络层(包括 Microsoft 示例)一无所知。因此,您将看到大多数软件在异步接收方法中进行响应,这是一种简单的解决方案,它是传输层。正确的方法是在应用层发送和处理响应。所以我喜欢做的是使用一个状态对象,并在接收方法中将接收数据写入状态对象。然后让应用层从状态对象中读取数据。

标签: c# xml asynchronous tcp tcpclient


【解决方案1】:

我不知道通过回答我自己的问题来询问我的新实现是否正确。

方法如下:

  • 与设备连接

第一次请求

  • 发送请求
  • 开始阅读
  • 处理响应或多个响应

第二次请求

  • 发送下一个请求
  • 开始阅读
  • 处理响应或多个响应

我的代码:

public class StateObject
{
    // Client socket.  
    public TcpClient workSocket = null;
    // Size of receive buffer.  
    public const int BufferSize = 4096;
    // Receive buffer.  
    public byte[] buffer = new byte[BufferSize];
    // Received data string.  
    public StringBuilder data = new StringBuilder();

    public string responseMessage = "";
}

 public class Client
{
    private StateObject state;
    // Create a new Mutex. The Mutex is when we're writing ore reating the Data of the State Object.
    private static Mutex mut = new Mutex();

    public static ManualResetEvent MessageReceived = new ManualResetEvent(false);
    public StateObject State { get => state; set => state = value; }

    private void EndRead(IAsyncResult AR)
    {
        try
        {
            State = (StateObject)AR.AsyncState;

            NetworkStream ns = State.workSocket.GetStream();

            // Get the number of received bytes
            int receivedBytes = ns.EndRead(AR);

            // Connection Closed
            if (receivedBytes == 0)
            {
                return;
            }

            if (receivedBytes > 0)
            {
                // Append buffer content to String.    
                mut.WaitOne();
                State.data.Append(Encoding.ASCII.GetString(State.buffer, 0, receivedBytes));
                mut.ReleaseMutex();

                if(State.data.ToString().Contains(MettlerToledo.Message.END_DELIMITER))
                {
                    // Copy thread save the content into the Response Message.
                    mut.WaitOne();
                    State.responseMessage = State.data.ToString();
                    mut.ReleaseMutex();
                    // Send the signal we got the entire message. So the message can be processed.
                    MessageReceived.Set();
                } else
                {
                    // Message not yet complete, so we're waiting for more data.
                    ns.BeginRead(State.buffer, 0, StateObject.BufferSize, EndRead, state);
                }
            }
        }
        // Catch Exceptions
    }


    public void BeginRead()
    {
        Console.WriteLine("Begin Read");
        try
        {
            NetworkStream ns = State.workSocket.GetStream();
            ns.BeginRead(State.buffer, 0, StateObject.BufferSize, EndRead, State);

        }
        catch (SocketException ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

static class Program
{
    [STAThread]
    static void Main()
    {
        Client client = new Client();
        client.Connect("127.0.0.1", 62324);

        // F I R S T   R E Q U E S T   -    O N E   R E S P O N S E 
        // Send Request
        client.BeginSend(/* Some XML Request string*/);
        // Start Reading
        client.BeginRead();
        // Waiting for Response
        Client.MessageReceived.WaitOne();
        /* Process the responseMessage
         * ...
         */
        // Clear the Processed Data 
        client.State.data.Clear();
        Client.MessageReceived.Reset();

        // S E C O N D   R E Q U E S T   -   M U L T I P L E   R E S P O N S E S
        // Send Next Request 
        client.BeginSend(/* Some XML Request string*/);
        // Start Reading again
        client.BeginRead();
        // Waiting for 1. Response
        Client.MessageReceived.WaitOne();
        /* Process the next responseMessage
         * ...
         */
        // Clear the Processed Data 
        client.State.data.Clear();
        Client.MessageReceived.Reset();

        // Waiting for 2. Response
        client.BeginRead();
        Client.MessageReceived.WaitOne();
        /* Process the next responseMessage
         * ...
         */
        // Clear the Processed Data 
        client.State.data.Clear();
        Client.MessageReceived.Reset();


        // N E X T   R E Q U E S T S 
        // ....
    }
}

【讨论】:

    猜你喜欢
    • 2014-04-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-15
    • 2016-05-24
    • 1970-01-01
    相关资源
    最近更新 更多