【问题标题】:C# Issues with Multi-Threading and Socket多线程和套接字的 C# 问题
【发布时间】:2013-10-14 06:53:33
【问题描述】:

首先,我只是想让你知道我对编程并不陌生,帮助我应该更容易:)

我在 C# 中使用 Socket 进行的多线程聊天遇到问题。

我有 3 个线程:

  • void ListenSocketConnection:检查可以连接的 Socket。已连接的 Socket 被添加到 List
  • void CheckIfClientStillConnectedThread : 检查 Socket 是否断开。从列表中删除断开连接的套接字
  • void ReceiveDataListener : 检查 Socket 是否接收到数据
    • 这是问题所在。如果第一个或第二个线程从 List 中删除一个 Socket,'foreach (ClientManager cManager in clientsList)' 将引发异常。
    • 这是第二个问题。如果在该 foreach 期间套接字断开连接,“foreach ClientManager cManager in clientsList)”将引发异常:DisposedException

你对我如何解决这个问题有什么建议吗?

这是我的代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using System.ComponentModel;
using System.Threading;

namespace JAChat.Library
{
    class SocketServer
    {
        private Socket socketServer;
        private BackgroundWorker bwSocketConnectListener;
        private BackgroundWorker bwCheckIfConnected;
        private BackgroundWorker bwReceiveDataListener;
        private List<ClientManager> clientsList;

        #region Constructor
        public SocketServer(int port)
        {
            clientsList = new List<ClientManager>();

            socketServer = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            socketServer.Bind(new IPEndPoint(IPAddress.Any, port));
            socketServer.Listen(100);

            bwSocketConnectListener = new BackgroundWorker();
            bwSocketConnectListener.DoWork += new DoWorkEventHandler(ListenSocketConnection);
            bwSocketConnectListener.RunWorkerAsync();

            bwCheckIfConnected = new BackgroundWorker();
            bwCheckIfConnected.DoWork += CheckIfClientStillConnectedThread;
            bwCheckIfConnected.RunWorkerAsync();

            bwReceiveDataListener = new BackgroundWorker();
            bwReceiveDataListener.DoWork += ReceiveDataListener;
            bwReceiveDataListener.RunWorkerAsync();
        }
        #endregion

        #region Getter
        public List<ClientManager> connectedClients
        {
            get
            {
                return clientsList;
            }
        }
        #endregion

        #region Public Methods
        /// <summary>
        /// Parse and send the command object to targets
        /// </summary>
        public void sendCommand(Command cmd)
        {
            BackgroundWorker test = new BackgroundWorker();
            test.DoWork += delegate {
                foreach(ClientManager cManager in clientsList){
                    cManager.sendCommand(cmd);
                }
            };
            test.RunWorkerAsync();
        }

        /// <summary>
        /// Disconnect and close the socket
        /// </summary>
        public void Disconnect()
        {
            socketServer.Disconnect(false);
            socketServer.Close();
            socketServer = null; //Stop some background worker
        }
        #endregion

        #region Private Methods
        private void ListenSocketConnection(object sender, DoWorkEventArgs e)
        {
            while (socketServer != null)
            {
                //Get and WAIT for new connection
                ClientManager newClientManager = new ClientManager(socketServer.Accept());
                clientsList.Add(newClientManager);
                onClientConnect.Invoke(newClientManager);
            }
        }

        private void CheckIfClientStillConnectedThread(object sender, DoWorkEventArgs e){
            while(socketServer != null){
                for(int i=0;i<clientsList.Count;i++){
                    if(clientsList[i].socket.Poll(10,SelectMode.SelectRead) && clientsList[i].socket.Available==0){
                        clientsList[i].socket.Close();
                        onClientDisconnect.Invoke(clientsList[i]);
                        clientsList.Remove(clientsList[i]);
                        i--;                        
                    }
                }
                Thread.Sleep(5);
            }
        }

        private void ReceiveDataListener(object unused1, DoWorkEventArgs unused2){
            while (socketServer != null){
                foreach (ClientManager cManager in clientsList)
                {
                    try
                    {
                        if (cManager.socket.Available > 0)
                        {
                            Console.WriteLine("Receive Data Listener 0");
                            //Read the command's Type.
                            byte[] buffer = new byte[4];
                            int readBytes = cManager.socket.Receive(buffer, 0, 4, SocketFlags.None);
                            Console.WriteLine("Receive Data Listener 1");
                            if (readBytes == 0)
                                break;
                            Console.WriteLine("Receive Data Listener 2");
                            CommandType cmdType = (CommandType)(BitConverter.ToInt32(buffer, 0));
                            Console.WriteLine("Receive Data Listener 3");

                            //Read the sender IP size.
                            buffer = new byte[4];
                            readBytes = cManager.socket.Receive(buffer, 0, 4, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            int senderIPSize = BitConverter.ToInt32(buffer, 0);

                            //Read the sender IP.
                            buffer = new byte[senderIPSize];
                            readBytes = cManager.socket.Receive(buffer, 0, senderIPSize, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            IPAddress cmdSenderIP = IPAddress.Parse(System.Text.Encoding.ASCII.GetString(buffer));

                            //Read the sender name size.
                            buffer = new byte[4];
                            readBytes = cManager.socket.Receive(buffer, 0, 4, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            int senderNameSize = BitConverter.ToInt32(buffer, 0);

                            //Read the sender name.
                            buffer = new byte[senderNameSize];
                            readBytes = cManager.socket.Receive(buffer, 0, senderNameSize, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            string cmdSenderName = System.Text.Encoding.Unicode.GetString(buffer);

                            //Read target IP size.
                            string cmdTarget = "";
                            buffer = new byte[4];
                            readBytes = cManager.socket.Receive(buffer, 0, 4, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            int targetIPSize = BitConverter.ToInt32(buffer, 0);

                            //Read the command's target.
                            buffer = new byte[targetIPSize];
                            readBytes = cManager.socket.Receive(buffer, 0, targetIPSize, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            cmdTarget = System.Text.Encoding.ASCII.GetString(buffer);

                            //Read the command's MetaData size.
                            string cmdMetaData = "";
                            buffer = new byte[4];
                            readBytes = cManager.socket.Receive(buffer, 0, 4, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            int metaDataSize = BitConverter.ToInt32(buffer, 0);

                            //Read the command's Meta data.
                            buffer = new byte[metaDataSize];
                            readBytes = cManager.socket.Receive(buffer, 0, metaDataSize, SocketFlags.None);
                            if (readBytes == 0)
                                break;
                            cmdMetaData = System.Text.Encoding.Unicode.GetString(buffer);

                            //Create the command object
                            Command cmd = new Command(cmdType, cmdSenderIP, cmdSenderName, IPAddress.Parse(cmdTarget), cmdMetaData);
                            this.onCommandReceived(cmd);
                        }
                    }
                    catch (ObjectDisposedException) {/*Le socket s'est déconnectée pendant le for each. Ignore l'érreur et retourne dans le while*/ }
                    catch (InvalidOperationException) { /* clientsList a été modifié pendant le foreach et délanche une exception. Retour while*/}
                }                
            }
            Console.WriteLine("Receive data listener closed");
        }
        #endregion

        #region Events
        public delegate void OnClientConnectEventHandler(ClientManager client);
        /// <summary>
        /// Events invoked when a client connect to the server
        /// </summary>
        public event OnClientConnectEventHandler onClientConnect = delegate { };

        public delegate void OnClientDisconnectEventHandler(ClientManager client);
        /// <summary>
        /// Events invoked when a client disconnect from the server
        /// </summary>
        public event OnClientDisconnectEventHandler onClientDisconnect = delegate { };

        public delegate void OnCommandReceivedEventHandler(Command cmd);
        /// <summary>
        /// Events invoked when a command has been sent to the server
        /// </summary>
        public event OnCommandReceivedEventHandler onCommandReceived = delegate { };
        #endregion
    }
}

【问题讨论】:

  • 检查是否接收到的套接字通常会引发异常。为什么?什么例外?
  • 有什么理由不使用Asynchronous 套接字?
  • @usr 我想你错过了阅读第一行。 OP 明确表示 我对编程并不陌生
  • 没错,我不是编程新手 :) 很抱歉,如果解释不清楚,请看我编辑的 OP :)

标签: c# multithreading list sockets


【解决方案1】:
  1. 有多个线程,但我没有看到任何同步。这是不正确的。使用锁来保护可变的共享状态。
  2. 与其对所有套接字进行集中管理、对DataAvailable 进行轮询和检查,为什么不只使用每个套接字一个线程或异步IO 呢?这样,您一次只能管理一个套接字。无需轮询。您只需调用Read(或其异步版本)并等待数据到达。这是处理套接字的更好范例。基本上,如果您的套接字代码包含DataAvailable 或轮询,那么您将违反最佳实践(并且可能在某处存在错误)。想一想在不使用这两者的情况下如何解决这个问题。这是可能的,而且更好。
  3. ReceiveDataListener 中,您假设如果数据可用,则整个消息可用。这是错误的,因为 TCP 是面向流的。您可以以任意小块接收发送的数据。按照我的观点 (2) 解决这个问题。

详细说明(2):这基本上是一个演员模型。每个插槽一个演员。无论您是使用线程、async/await 还是使用传统异步 IO 来实现 Actor 都无关紧要。

希望这会有所帮助。随时在下面的 cmets 中提出后续问题。

【讨论】:

  • 感谢您的回答,我习惯于使用 BackgroundWorket 或 'new Thread()' 进行线程化,但是您似乎在谈论不同的东西,我不确定是否理解。另外,每个套接字创建一个线程不是错的吗?我的意思是线程太多不是很好?您是否有一个很好的例子可以说明您要告诉我的内容?谢谢:)
  • 如果套接字很少(比如几十个),每个套接字一个线程/任务/后台工作者几乎总是可以的。如果有更多,请使用异步 IO,而不是轮询。对于异步 IO 在 C# 4 上的工作方式,这里有一个看起来很可靠的示例:msdn.microsoft.com/en-us/library/fx6588te.aspx。在 C# 5 上,您将使用 async/await,这几乎是微不足道的。
【解决方案2】:

集合正在被多个线程修改,因此每次询问时计数都可能非常大。因此,您应该将其设置为固定金额;即在循环之前,然后遍历列表。此外,由于您要删除元素,因此倒计时而不是倒计时是更好的选择。

考虑以下代码:

private void CheckIfClientStillConnectedThread(object sender, DoWorkEventArgs e)
{
    while (socketServer != null)
    {
        int count = clientsList.Count -1;
        for (int i=count; i >= 0 ; i--)
        {
            if (clientsList[i].socket.Poll(10, SelectMode.SelectRead) && clientsList[i].socket.Available == 0)
            {
                clientsList[i].socket.Close();
                onClientDisconnect.Invoke(clientsList[i]);
                clientsList.Remove(clientsList[i]);
            }
        }
        Thread.Sleep(5);
    }
}

【讨论】:

  • 哈哈谢谢,确实可以避免一些例外,如果一些东西被添加或从列表中删除。感谢您的提示!
猜你喜欢
  • 2016-12-27
  • 2014-03-12
  • 2016-03-16
  • 2012-09-07
  • 1970-01-01
  • 2019-09-21
  • 2011-06-14
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多