【问题标题】:c# Threadpool - limit number of threadsc# Threadpool - 限制线程数
【发布时间】:2012-05-07 16:41:39
【问题描述】:

我正在开发一个控制台应用程序。

我想使用线程池来执行网络下载。这是一些假代码。

 for (int loop=0; loop< 100; loop++)
 {
     ThreadPool.QueueUserWorkItem(new WaitCallback(GetPage), pageList[loop]);
 }


snip

private static void GetPage(object o)
{
    //get the page
}

如何防止我的代码同时启动两个以上(或十个,或其他)线程?

我试过了

    ThreadPool.SetMaxThreads(1, 0);
    ThreadPool.SetMinThreads(1, 0);

但它们似乎没有影响。

【问题讨论】:

    标签: c# multithreading threadpool


    【解决方案1】:

    我会使用Parallel.For 并相应地设置MaxDegreeOfParallelism

    Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
      i =>
      {
        GetPage(pageList[i]);
      });
    

    【讨论】:

    • Dataflow 可以达到同样的效果吗?你有小费吗?
    【解决方案2】:

    只需从以下代码反转该代码:

    ThreadPool.SetMaxThreads(1, 0);
    ThreadPool.SetMinThreads(1, 0);
    

    收件人:

    ThreadPool.SetMinThreads(1, 0);
    ThreadPool.SetMaxThreads(1, 0);
    

    不能将 MaxThread 设置为小于 MinThread

    【讨论】:

    • 这是实际问题的答案:如何防止 my 代码启动...
    【解决方案3】:

    就个人而言,我会使用SmartThreadPool 并单独保留线程池。但是,这可能是您想要的:C# thread pool limiting threads

    链接中包含的代码(请注明原作者,而不是我)

    System.Threading.Semaphore S = new System.Threading.Semaphore(3, 3);  
    try 
    {     
      // wait your turn (decrement)     
      S.WaitOne();     
      // do your thing 
    }  
    finally 
    {     
      // release so others can go (increment)     
      S.Release(); 
    } 
    

    【讨论】:

      【解决方案4】:

      说明

      您可以使用ThreadPool.SetMaxThreads 方法来做到这一点。

      但是使用 ThreadPool for WebRequest 存在一些问题。 阅读,例如,this (Bug in ThreadPool or HttpWebRequest?)

      样本

      ThreadPool.SetMaxThreads(2,2);
      

      编辑

      为此,我个人会使用 Linq 的 AsParallel。

      更多信息

      【讨论】:

      • 但是,您需要小心。线程池工作线程通常用于其他目的。例如,在 Web 应用程序中,ThreadPool 工作人员处理 Web 请求。
      • 仅供参考...这限制了 IIS 线程池中的线程总数,因此基本上用户将无法访问您的站点。此代码应位于具有自己的线程池的单独网站上。
      • 为问题添加了更多细节
      • 我相信您报告的错误及其线程与此问题无关。他使用 HttpWebRequest 的方式导致了他的实现问题。
      【解决方案5】:

      查看ThreadPool.SetMaxThreads的参数。第一个参数是工作线程的数量,第二个参数是异步线程的数量,也就是你说的那个。

      在文档的下方,它说:

      您无法设置工作线程数或 I/O 数 完成线程数小于处理器数 在电脑里。

      听起来您正在尝试将 ThreadPool 用于不打算用于的事情。如果您想限制下载量,请创建一个为您管理此问题的类,因为 ThreadPool 不一定是您问题的完整解决方案。

      我建议在 ThreadPool 中启动两个线程并等待回调的类。当它接收到一个线程完成的回调时,会排队一个新的线程。

      【讨论】:

        【解决方案6】:

        如果您对 .Net 2.0 比较严格,您可以使用以下技术:

        知道您将任务排入ThreadPool 的事实是,它将创建一个新线程(当然,如果没有空闲线程),您将在执行此操作之前等待,直到有一个空闲线程。为此目的,使用BlockingCounter 类(如下所述),一旦达到限制,将等待增加,直到有人(另一个线程)减少它。然后它进入“关闭”状态,表示不会执行新的增量并等待完成。

        以下示例显示最多 4 个任务,总数为 10。

        class Program
        {
        
            static int s_numCurrentThreads = 0;
        
            static Random s_rnd = new Random();
        
            static void Main(string[] args)
            {
        
                int maxParallelTasks = 4;
                int totalTasks = 10;
        
                using (BlockingCounter blockingCounter = new BlockingCounter(maxParallelTasks))
                {
                    for (int i = 1; i <= totalTasks; i++)
                    {
        
                        Console.WriteLine("Submitting task {0}", i);
                        blockingCounter.WaitableIncrement();
                        if (!ThreadPool.QueueUserWorkItem((obj) =>
                                                              {
                                                                  try
                                                                  {
                                                                      ThreadProc(obj);
                                                                  }
                                                                  catch (Exception ex)
                                                                  {
                                                                      Console.Error.WriteLine("Task {0} failed: {1}", obj, ex.Message);
                                                                  }
                                                                  finally
                                                                  {
                                                                      // Exceptions are possible here too, 
                                                                      // but proper error handling is not the goal of this sample
                                                                      blockingCounter.WaitableDecrement();
                                                                  }
                                                              }, i))
                        {
                            blockingCounter.WaitableDecrement();
                            Console.Error.WriteLine("Failed to submit task {0} for execution.", i);
                        }
                    }
        
                    Console.WriteLine("Waiting for copmletion...");
                    blockingCounter.CloseAndWait(30000);
                }
        
                Console.WriteLine("Work done!");
                Console.ReadKey();
        
            }
        
            static void ThreadProc (object obj)
            {
                int taskNumber = (int) obj;
                int numThreads = Interlocked.Increment(ref s_numCurrentThreads);
        
                Console.WriteLine("Task {0} started. Total: {1}", taskNumber, numThreads);
                int sleepTime = s_rnd.Next(0, 5);
                Thread.Sleep(sleepTime * 1000);
                Console.WriteLine("Task {0} finished.", taskNumber);
        
                Interlocked.Decrement(ref s_numCurrentThreads);
            }
        

        它使用 BlockingCounter 类,该类基于 Marc Gravell 发布的 here 的 SizeQueue,但没有计数器而不是队列。当你结束排队新线程时调用 Close() 方法,然后等待它完成。

        public class BlockingCounter : IDisposable
        {
            private int m_Count;
            private object m_counterLock = new object();
        
            private bool m_isClosed = false;
            private volatile bool m_isDisposed = false;
        
            private int m_MaxSize = 0;
        
            private ManualResetEvent m_Finished = new ManualResetEvent(false);
        
            public BlockingCounter(int maxSize = 0)
            {
                if (maxSize < 0)
                    throw new ArgumentOutOfRangeException("maxSize");
                m_MaxSize = maxSize;
            }
        
        
            public void WaitableIncrement(int timeoutMs = Timeout.Infinite)
            {
                lock (m_counterLock)
                {
                    while (m_MaxSize > 0 && m_Count >= m_MaxSize)
                    {
                        CheckClosedOrDisposed();
                        if (!Monitor.Wait(m_counterLock, timeoutMs))
                            throw new TimeoutException("Failed to wait for counter to decrement.");
                    }
        
                    CheckClosedOrDisposed();
                    m_Count++;
        
                    if (m_Count == 1)
                    {
                        Monitor.PulseAll(m_counterLock);
                    }
        
                }
            }
        
            public void WaitableDecrement(int timeoutMs = Timeout.Infinite)
            {
                lock (m_counterLock)
                {
                    try
                    {
                        while (m_Count == 0)
                        {
                            CheckClosedOrDisposed();
                            if (!Monitor.Wait(m_counterLock, timeoutMs))
                                throw new TimeoutException("Failed to wait for counter to increment.");
                        }
        
                        CheckDisposed();
        
                        m_Count--;
        
                        if (m_MaxSize == 0 || m_Count == m_MaxSize - 1)
                            Monitor.PulseAll(m_counterLock);
                    }
                    finally
                    {
                        if (m_isClosed && m_Count == 0)
                            m_Finished.Set();
                    }
                }
            }
        
            void CheckClosedOrDisposed()
            {
                if (m_isClosed)
                    throw new Exception("The counter is closed");
                CheckDisposed();
            }
        
            void CheckDisposed()
            {
                if (m_isDisposed)
                    throw new ObjectDisposedException("The counter has been disposed.");
            }
        
            public void Close()
            {
                lock (m_counterLock)
                {
                    CheckDisposed();
                    m_isClosed = true;
                    Monitor.PulseAll(m_counterLock);
                }
            }
        
            public bool WaitForFinish(int timeoutMs = Timeout.Infinite)
            {
                CheckDisposed();
                lock (m_counterLock)
                { 
                     if (m_Count == 0)
                         return true;
                }
                return m_Finished.WaitOne(timeoutMs);
            }
        
            public void CloseAndWait (int timeoutMs = Timeout.Infinite)
            {
                Close();
                WaitForFinish(timeoutMs);
            }
        
            public void Dispose()
            {
                if (!m_isDisposed)
                {
                    m_isDisposed = true;
                    lock (m_counterLock)
                    {
                        // Wake up all waiting threads, so that they know the object 
                        // is disposed and there's nothing to wait anymore
                        Monitor.PulseAll(m_counterLock);
                    }
                    m_Finished.Close();
                }
            }
        }
        

        结果会是这样的:

        Submitting task 1
        Submitting task 2
        Submitting task 3
        Submitting task 4
        Submitting task 5
        Task 1 started. Total: 1
        Task 1 finished.
        Task 3 started. Total: 1
        Submitting task 6
        Task 2 started. Total: 2
        Task 3 finished.
        Task 6 started. Total: 4
        Task 5 started. Total: 3
        Task 4 started. Total: 4
        Submitting task 7
        Task 4 finished.
        Submitting task 8
        Task 7 started. Total: 4
        Task 5 finished.
        Submitting task 9
        Task 7 finished.
        Task 8 started. Total: 4
        Task 9 started. Total: 4
        Submitting task 10
        Task 2 finished.
        Waiting for copmletion...
        Task 10 started. Total: 4
        Task 10 finished.
        Task 6 finished.
        Task 8 finished.
        Task 9 finished.
        Work done!
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2011-05-12
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-06-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多