【问题标题】:Monitor.Wait/Pulse race condition in a multithreaded server多线程服务器中的 Monitor.Wait/Pulse 竞争条件
【发布时间】:2009-05-12 08:00:58
【问题描述】:

我在多线程 TCP 服务器中遇到了联锁 Monitor.Wait 和 Monitor.Pulse 的问题。为了演示我的问题,这是我的服务器代码:

public class Server
{
    TcpListener listener;
    Object sync;
    IHandler handler;
    bool running;

    public Server(IHandler handler, int port)
    {
        this.handler = handler;
        IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
        listener = new TcpListener(address, port);
        sync = new Object();
        running = false;
    }

    public void Start()
    {
        Thread thread = new Thread(ThreadStart);
        thread.Start();
    }

    public void Stop()
    {
        lock (sync)
        {
            listener.Stop();
            running = false;
            Monitor.Pulse(sync);
        }
    }

    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            lock (sync)
            {
                while (running)
                {
                    try
                    {
                        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                        Monitor.Wait(sync);  // Release lock and wait for a pulse
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
        }
    }

    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        lock (sync)
        {
            Monitor.Pulse(sync);
        } 

        if (running)
        {
            TcpListener listener = (TcpListener)result.AsyncState;
            using (TcpClient client = listener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
}

这是我的客户端代码:

class Client
{
    class EchoHandler : IHandler
    {
        public void Handle(Stream stream)
        {
            System.Console.Out.Write("Echo Handler: ");
            StringBuilder sb = new StringBuilder();
            byte[] buffer = new byte[1024];
            int count = 0;
            while ((count = stream.Read(buffer, 0, 1024)) > 0)
            {
                sb.Append(Encoding.ASCII.GetString(buffer, 0, count));
            }
            System.Console.Out.WriteLine(sb.ToString());
            System.Console.Out.Flush();
        }
    }

    static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];

    public static int Main()
    {
        Server server1 = new Server(new EchoHandler(), 1000);
        Server server2 = new Server(new EchoHandler(), 1001);

        server1.Start();
        server2.Start();

        Console.WriteLine("Press return to test...");
        Console.ReadLine();

        // Note interleaved ports
        SendMsg("Test1", 1000);
        SendMsg("Test2", 1001);
        SendMsg("Test3", 1000);
        SendMsg("Test4", 1001);
        SendMsg("Test5", 1000);
        SendMsg("Test6", 1001);
        SendMsg("Test7", 1000);

        Console.WriteLine("Press return to terminate...");
        Console.ReadLine();

        server1.Stop();
        server2.Stop();

        return 0;
    }

    public static void SendMsg(String msg, int port)
    {
        IPEndPoint endPoint = new IPEndPoint(localhost, port);

        byte[] buffer = Encoding.ASCII.GetBytes(msg);
        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
        {
            s.Connect(endPoint);
            s.Send(buffer);
        }
    }
}

客户端发送 7 条消息,但服务器只打印 4 条:

按回车测试... 按回车键终止... 回声处理程序:Test1 回声处理程序:Test3 回声处理器:Test2 回声处理程序:Test4

我怀疑监视器会因为在Wait 发生(在ThreadStart 方法中)之前允许Pulse 发生(在服务器的Accept 方法中)而感到困惑,即使ThreadStart 应该仍然拥有sync对象上的锁,直到它调用Monitor.Wait(),然后Accept方法可以获取锁并发送它的Pulse。如果在服务器的Stop()方法中注释掉这两行:

//listener.Stop();
//running = false;

当调用服务器的Stop() 方法时会出现剩余的消息(即唤醒服务器的sync 对象会导致它调度剩余的传入消息)。在我看来,这只能发生在ThreadStartAccept 方法之间的竞争条件下,但sync 对象周围的锁定应该可以防止这种情况发生。

有什么想法吗?

非常感谢, 西蒙。

ps。请注意,我知道输出出现乱序等,我特别询问锁和监视器之间的竞争条件。干杯,SH。

【问题讨论】:

    标签: c# multithreading client-server


    【解决方案1】:

    问题是您正在使用 Pulse/Wait 作为信号。正确的信号,例如 AutoResetEvent 具有这样的状态,即它保持信号状态,直到线程调用 WaitOne()。在没有任何线程等待的情况下调用 Pulse 将成为一个 noop。

    这与同一个线程可以多次获取锁的事实相结合。由于您使用的是异步编程,因此接受回调可以由执行 BeginAcceptTcpClient 的同一线程调用。

    让我来说明一下。我注释掉了第二台服务器,并更改了您服务器上的一些代码。

    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            lock (sync)
            {
                while (running)
                {
                    try
                    {
                        Console.WriteLine("BeginAccept [{0}]", 
                            Thread.CurrentThread.ManagedThreadId);
                        listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                        Console.WriteLine("Wait [{0}]", 
                            Thread.CurrentThread.ManagedThreadId);
                        Monitor.Wait(sync);  // Release lock and wait for a pulse
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e.Message);
                    }
                }
            }
        }
    }
    
    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        lock (sync)
        {
            Console.WriteLine("Pulse [{0}]", 
                Thread.CurrentThread.ManagedThreadId);
            Monitor.Pulse(sync);
        }
        if (running)
        {
            TcpListener localListener = (TcpListener)result.AsyncState;
            using (TcpClient client = localListener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
    

    我运行的输出如下所示。如果您自己运行此代码,则值会有所不同,但通常是相同的。

    Press return to test...
    BeginAccept [3]
    Wait [3]
    
    Press return to terminate...
    Pulse [5]
    BeginAccept [3]
    Pulse [3]
    Echo Handler: Test1
    Echo Handler: Test3
    Wait [3]
    

    如您所见,调用了两个 Pulse,一个来自单独的线程(Pulse [5]),它唤醒第一个等待。线程 3 然后执行另一个 BeginAccept,但有待处理的传入连接,该线程决定立即调用 Accept 回调。由于 Accept 是由同一个线程调用的,因此 Lock(sync) 不会阻塞,而是 Pulse [3] 立即在空线程队列上。

    调用两个处理程序并处理这两个消息。

    一切正常,ThreadStart 再次开始运行并无限期地等待。

    现在,这里的根本问题是您试图将监视器用作信号。因为它不记得第二个 Pulse 丢失的状态。

    但是有一个简单的解决方案。使用 AutoResetEvents,这是一个正确的信号,它会记住它的状态。

    public Server(IHandler handler, int port)
    {
        this.handler = handler;
        IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0];
        listener = new TcpListener(address, port);
        running = false;
        _event = new AutoResetEvent(false);
    }
    
    public void Start()
    {
        Thread thread = new Thread(ThreadStart);
        thread.Start();
    }
    
    public void Stop()
    {
        listener.Stop();
        running = false;
        _event.Set();
    }
    
    void ThreadStart()
    {
        if (!running)
        {
            listener.Start();
            running = true;
            while (running)
            {
                try
                {
                    listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener);
                    _event.WaitOne();
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }
            }
        }
    }
    
    void Accept(IAsyncResult result)
    {
        // Let the server continue listening
        _event.Set();
        if (running)
        {
            TcpListener localListener = (TcpListener) result.AsyncState;
            using (TcpClient client = localListener.EndAcceptTcpClient(result))
            {
                handler.Handle(client.GetStream());
            }
        }
    }
    

    【讨论】:

    • 谢谢垫子。我假设 BeginAcceptTcpClient 总是在单独的线程上运行,因此我可以将同步对象用作关键部分。你在现场,信号是要走的路。再次感谢。嘘
    猜你喜欢
    • 2013-11-14
    • 2016-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-08-26
    • 1970-01-01
    相关资源
    最近更新 更多