【问题标题】:Multi-threading with .Net HttpListener.Net HttpListener 的多线程
【发布时间】:2011-06-08 00:07:39
【问题描述】:

我有一个听众:

listener = new HttpListener();
listener.Prefixes.Add(@"http://+:8077/");
listener.Start();
listenerThread = new Thread(HandleRequests);
listenerThread.Start();

我正在处理请求:

private void HandleRequests()
{
    while (listener.IsListening)
    {
        var context = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
        context.AsyncWaitHandle.WaitOne();
    }
}

private void ListenerCallback(IAsyncResult ar)
{
    var listener = ar.AsyncState as HttpListener;

    var context = listener.EndGetContext(ar);

    //do some stuff
}

我想这样写void Stop()

  1. 它将一直阻塞,直到所有当前处理的请求结束(即等待所有线程“做一些事情”)。
  2. 虽然它会等待已经开始的请求,但它不会允许更多的请求(即在ListenerCallback 的开头返回)。
  3. 之后它将调用listener.Stop()listener.IsListening 变为假)。

怎么写?

编辑:您如何看待这个解决方案?安全吗?

public void Stop() 
{
    lock (this)
    {
        isStopping = true;
    }
    resetEvent.WaitOne(); //initially set to true
    listener.Stop();
}

private void ListenerCallback(IAsyncResult ar)
{
    lock (this)
    {
        if (isStopping)
            return;

        resetEvent.Reset();
        numberOfRequests++;
    }

    var listener = ar.AsyncState as HttpListener;

    var context = listener.EndGetContext(ar);

    //do some stuff

    lock (this)
    {
        if (--numberOfRequests == 0)
            resetEvent.Set();
    }
}

【问题讨论】:

    标签: c# .net multithreading httplistener


    【解决方案1】:

    为了完整起见,如果您管理自己的工作线程,如下所示:

    class HttpServer : IDisposable
    {
        private readonly HttpListener _listener;
        private readonly Thread _listenerThread;
        private readonly Thread[] _workers;
        private readonly ManualResetEvent _stop, _ready;
        private Queue<HttpListenerContext> _queue;
    
        public HttpServer(int maxThreads)
        {
            _workers = new Thread[maxThreads];
            _queue = new Queue<HttpListenerContext>();
            _stop = new ManualResetEvent(false);
            _ready = new ManualResetEvent(false);
            _listener = new HttpListener();
            _listenerThread = new Thread(HandleRequests);
        }
    
        public void Start(int port)
        {
            _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
            _listener.Start();
            _listenerThread.Start();
    
            for (int i = 0; i < _workers.Length; i++)
            {
                _workers[i] = new Thread(Worker);
                _workers[i].Start();
            }
        }
    
        public void Dispose()
        { Stop(); }
    
        public void Stop()
        {
            _stop.Set();
            _listenerThread.Join();
            foreach (Thread worker in _workers)
                worker.Join();
            _listener.Stop();
        }
    
        private void HandleRequests()
        {
            while (_listener.IsListening)
            {
                var context = _listener.BeginGetContext(ContextReady, null);
    
                if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
                    return;
            }
        }
    
        private void ContextReady(IAsyncResult ar)
        {
            try
            {
                lock (_queue)
                {
                    _queue.Enqueue(_listener.EndGetContext(ar));
                    _ready.Set();
                }
            }
            catch { return; }
        }
    
        private void Worker()
        {
            WaitHandle[] wait = new[] { _ready, _stop };
            while (0 == WaitHandle.WaitAny(wait))
            {
                HttpListenerContext context;
                lock (_queue)
                {
                    if (_queue.Count > 0)
                        context = _queue.Dequeue();
                    else
                    {
                        _ready.Reset();
                        continue;
                    }
                }
    
                try { ProcessRequest(context); }
                catch (Exception e) { Console.Error.WriteLine(e); }
            }
        }
    
        public event Action<HttpListenerContext> ProcessRequest;
    }
    

    【讨论】:

    • 这太棒了 - 它是测试 HttpListener 吞吐量的绝佳候选者。
    • 非常感谢您提供的那段代码!有两个小问题:1. ProcessRequest 可能为 null 2. HttpListenerContext 不是线程安全的,除非它是静态的
    • @MartinMeeser 感谢您的评论。 1. 我们可以使用ProcessRequest?.Invoke(context);,而不是将其包装在 try catch 块中。对于 2. 但是,如果静态不是一个选项,您有什么建议?
    • @JohnTube 对于上面的#1 或#2,这里没有问题。继续前进。
    • @topdog 当然 AutoResetEvent 是从等待池中释放线程时更常见的方法。这只是个人喜好,我不是他们的粉丝。
    【解决方案2】:

    有几种方法可以解决这个问题...这是一个简单的示例,它使用信号量来跟踪正在进行的工作,并在所有工作人员完成时发出信号。这应该会给你一个基本的工作思路。

    下面的解决方案并不理想,理想情况下我们应该在调用 BeginGetContext 之前获取信号量。这使得关机更加困难,所以我选择使用这种更简化的方法。如果我是“真正地”这样做,我可能会编写自己的线程管理,而不是依赖线程池。这将允许更可靠的关机。

    无论如何这里是完整的例子:

    class TestHttp
    {
        static void Main()
        {
            using (HttpServer srvr = new HttpServer(5))
            {
                srvr.Start(8085);
                Console.WriteLine("Press [Enter] to quit.");
                Console.ReadLine();
            }
        }
    }
    
    
    class HttpServer : IDisposable
    {
        private readonly int _maxThreads;
        private readonly HttpListener _listener;
        private readonly Thread _listenerThread;
        private readonly ManualResetEvent _stop, _idle;
        private readonly Semaphore _busy;
    
        public HttpServer(int maxThreads)
        {
            _maxThreads = maxThreads;
            _stop = new ManualResetEvent(false);
            _idle = new ManualResetEvent(false);
            _busy = new Semaphore(maxThreads, maxThreads);
            _listener = new HttpListener();
            _listenerThread = new Thread(HandleRequests);
        }
    
        public void Start(int port)
        {
            _listener.Prefixes.Add(String.Format(@"http://+:{0}/", port));
            _listener.Start();
            _listenerThread.Start();
        }
    
        public void Dispose()
        { Stop(); }
    
        public void Stop()
        {
            _stop.Set();
            _listenerThread.Join();
            _idle.Reset();
    
            //aquire and release the semaphore to see if anyone is running, wait for idle if they are.
            _busy.WaitOne();
            if(_maxThreads != 1 + _busy.Release())
                _idle.WaitOne();
    
            _listener.Stop();
        }
    
        private void HandleRequests()
        {
            while (_listener.IsListening)
            {
                var context = _listener.BeginGetContext(ListenerCallback, null);
    
                if (0 == WaitHandle.WaitAny(new[] { _stop, context.AsyncWaitHandle }))
                    return;
            }
        }
    
        private void ListenerCallback(IAsyncResult ar)
        {
            _busy.WaitOne();
            try
            {
                HttpListenerContext context;
                try
                { context = _listener.EndGetContext(ar); }
                catch (HttpListenerException)
                { return; }
    
                if (_stop.WaitOne(0, false))
                    return;
    
                Console.WriteLine("{0} {1}", context.Request.HttpMethod, context.Request.RawUrl);
                context.Response.SendChunked = true;
                using (TextWriter tw = new StreamWriter(context.Response.OutputStream))
                {
                    tw.WriteLine("<html><body><h1>Hello World</h1>");
                    for (int i = 0; i < 5; i++)
                    {
                        tw.WriteLine("<p>{0} @ {1}</p>", i, DateTime.Now);
                        tw.Flush();
                        Thread.Sleep(1000);
                    }
                    tw.WriteLine("</body></html>");
                }
            }
            finally
            {
                if (_maxThreads == 1 + _busy.Release())
                    _idle.Set();
            }
        }
    }
    

    【讨论】:

      【解决方案3】:

      我在问题的编辑部分查阅了我的代码,我决定接受它并进行一些修改:

      public void Stop() 
      {
          lock (locker)
          {
              isStopping = true;
          }
          resetEvent.WaitOne(); //initially set to true
          listener.Stop();
      }
      
      private void ListenerCallback(IAsyncResult ar)
      {
          lock (locker) //locking on this is a bad idea, but I forget about it before
          {
              if (isStopping)
                  return;
      
              resetEvent.Reset();
              numberOfRequests++;
          }
      
          try
          {
              var listener = ar.AsyncState as HttpListener;
      
              var context = listener.EndGetContext(ar);
      
              //do some stuff
          }
          finally //to make sure that bellow code will be executed
          {
              lock (locker)
              {
                  if (--numberOfRequests == 0)
                      resetEvent.Set();
              }
          }
      }
      

      【讨论】:

        【解决方案4】:

        只需调用 listener.Stop() 就可以了。这不会终止任何已经建立的连接,但会阻止任何新的连接。

        【讨论】:

        • 这不是真的。如果您在执行ListenerCallback 期间调用listener.Stop(),您将收到异常,例如。当调用EndGetContext 甚至更晚,当使用输出流时。我当然可以捕获异常,但我不想这样做。
        • 在我的代码中,我使用了一个标志,并且在对其调用 stop 之后不再引用侦听器,但是关闭侦听器并不会关闭已经接受的连接,而只是关闭侦听器。
        • 我不知道你说的“我使用旗帜”是什么意思。问题是,在ListenerCallback 中,我正在使用侦听器,如果另一个线程关闭它,而我正在使用它,我最终会遇到我提到的异常。
        • 在您的 Stop 方法中,如果您获取 _busy 信号量然后调用 listener.Stop 怎么样?这应该允许任何挂起的 ListenerCallback 调用在您销毁侦听器对象之前完成。
        【解决方案5】:

        这使用 BlockingCollection 类型的队列来服务请求。可以直接使用。您应该从该类派生一个类并覆盖 Response。

        using System;
        using System.Collections.Concurrent;
        using System.Net;
        using System.Text;
        using System.Threading;
        
        namespace Service
        {
            class HttpServer : IDisposable
            {
                private HttpListener httpListener;
                private Thread listenerLoop;
                private Thread[] requestProcessors;
                private BlockingCollection<HttpListenerContext> messages;
        
                public HttpServer(int threadCount)
                {
                    requestProcessors = new Thread[threadCount];
                    messages = new BlockingCollection<HttpListenerContext>();
                    httpListener = new HttpListener();
                }
        
                public virtual int Port { get; set; } = 80;
        
                public virtual string[] Prefixes
                {
                    get { return new string[] {string.Format(@"http://+:{0}/", Port )}; }
                }
        
                public void Start(int port)
                {
                    listenerLoop = new Thread(HandleRequests);
        
                    foreach( string prefix in Prefixes ) httpListener.Prefixes.Add( prefix );
        
                    listenerLoop.Start();
        
                    for (int i = 0; i < requestProcessors.Length; i++)
                    {
                        requestProcessors[i] = StartProcessor(i, messages);
                    }
                }
        
                public void Dispose() { Stop(); }
        
                public void Stop()
                {
                    messages.CompleteAdding();
        
                    foreach (Thread worker in requestProcessors) worker.Join();
        
                    httpListener.Stop();
                    listenerLoop.Join();
                }
        
                private void HandleRequests()
                {
                    httpListener.Start();
                    try 
                    {
                        while (httpListener.IsListening)
                        {
                            Console.WriteLine("The Linstener Is Listening!");
                            HttpListenerContext context = httpListener.GetContext();
        
                            messages.Add(context);
                            Console.WriteLine("The Linstener has added a message!");
                        }
                    }
                    catch(Exception e)
                    {
                        Console.WriteLine (e.Message);
                    }
                }
        
                private Thread StartProcessor(int number, BlockingCollection<HttpListenerContext> messages)
                {
                    Thread thread = new Thread(() => Processor(number, messages));
                    thread.Start();
                    return thread;
                }
        
                private void Processor(int number, BlockingCollection<HttpListenerContext> messages)
                {
                    Console.WriteLine ("Processor {0} started.", number);
                    try
                    {
                        for (;;)
                        {
                            Console.WriteLine ("Processor {0} awoken.", number);
                            HttpListenerContext context = messages.Take();
                            Console.WriteLine ("Processor {0} dequeued message.", number);
                            Response (context);
                        }
                    } catch { }
        
                    Console.WriteLine ("Processor {0} terminated.", number);
                }
        
                public virtual void Response(HttpListenerContext context)
                {
                    SendReply(context, new StringBuilder("<html><head><title>NULL</title></head><body>This site not yet implementd.</body></html>") );
                }
        
                public static void SendReply(HttpListenerContext context, StringBuilder responseString )
                {
                    byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString.ToString());
                    context.Response.ContentLength64 = buffer.Length;
                    System.IO.Stream output = context.Response.OutputStream;
                    output.Write(buffer, 0, buffer.Length);
                    output.Close();
                }
            }
        }
        

        这是一个如何使用它的示例。无需使用事件或任何锁块。 BlockingCollection 解决了所有这些问题。

        using System;
        using System.Collections.Concurrent;
        using System.IO;
        using System.Net;
        using System.Text;
        using System.Threading;
        
        namespace Service
        {
          class Server
          {
            public static void Main (string[] args)
            {
                HttpServer Service = new QuizzServer (8);
                Service.Start (80);
                for (bool coninute = true; coninute ;)
                {
                    string input = Console.ReadLine ().ToLower();
                    switch (input)
                    {
                        case "stop":
                            Console.WriteLine ("Stop command accepted.");
                            Service.Stop ();
                            coninute = false;
                            break;
                        default:
                            Console.WriteLine ("Unknown Command: '{0}'.",input);
                            break;
                    }
                }
            }
          }
        }
        

        【讨论】:

          猜你喜欢
          • 2021-04-09
          • 1970-01-01
          • 2011-08-13
          • 1970-01-01
          • 2010-10-02
          • 1970-01-01
          • 1970-01-01
          • 2013-10-09
          • 2013-11-14
          相关资源
          最近更新 更多