【问题标题】:Is there a better way to wait for queued threads?有没有更好的方法来等待排队的线程?
【发布时间】:2010-11-05 23:15:06
【问题描述】:

有没有更好的方法在执行另一个进程之前等待排队的线程?

目前我在做:

this.workerLocker = new object(); // Global variable
this.RunningWorkers = arrayStrings.Length; // Global variable

// Initiate process

foreach (string someString in arrayStrings)
{
     ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
     Thread.Sleep(100);
}

// Waiting execution for all queued threads
lock (this.workerLocker)  // Global variable (object)
{
     while (this.RunningWorkers > 0)
     {
          Monitor.Wait(this.workerLocker);
     }
}

// Do anything else    
Console.WriteLine("END");

// Method DoSomething() definition
public void DoSomething(object data)
{
    // Do a slow process...
    .
    .
    .

    lock (this.workerLocker)
    {
        this.RunningWorkers--;
        Monitor.Pulse(this.workerLocker);
    }
}

【问题讨论】:

  • 有很多方法。我认为您需要阅读良好的线程指南-albahari.com/threading。生产者消费者队列在您的场景中可能很有用,Wait and pulse 或 WaitHandles 也可能有用。 ——
  • 这一行存在竞态条件:this.RunningWorkers--;尝试使用 Interlocked.Decrement 方法进行线程安全减量。
  • @SolutionYogi: RunningWorkers 受锁保护(this.workerLocker)

标签: c# multithreading queue


【解决方案1】:

我不确定是否真的存在,我最近做了类似的事情来扫描子网的每个 IP 以接受特定端口。

我可以提出一些可以提高性能的建议:

  • 使用 ThreadPool 的 SetMaxThreads 方法来调整性能(即平衡同时运行大量线程与如此大量线程上的锁定时间)。

  • 在设置所有工作项时不要休眠,没有真正的需要(我立即意识到这一点。但是,请在 DoSomething 方法中休眠,可能只有一毫秒,以允许其他线程如果需要,可以跳进去。

我相信您可以自己实现一个更加自定义的方法,但我怀疑它会比使用 ThreadPool 更有效。

P.S 我不是 100% 清楚使用监视器的原因,因为您无论如何都要锁定?请注意,问这个问题只是因为我以前没有使用过 Monitor 类,而不是因为我实际上怀疑它的用途。

【讨论】:

    【解决方案2】:

    .NET 4.0 有一个新的类,Barrier

    除此之外,您的方法还不错,如果 RunningWorkers 在递减后为​​ 0,您可以只通过脉冲进行一点优化。看起来像:

    this.workerLocker = new object(); // Global variable
    this.RunningWorkers = arrayStrings.Length; // Global variable
    
    foreach (string someString in arrayStrings)
    {
         ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
         //Thread.Sleep(100);
    }
    
    // Waiting execution for all queued threads
    Monitor.Wait(this.workerLocker);
    

    // Method DoSomething() definition
    public void DoSomething(object data)
    {
        // Do a slow process...
        // ...
    
        lock (this.workerLocker)
        {
            this.RunningWorkers--;
            if (this.RunningWorkers == 0)
               Monitor.Pulse(this.workerLocker);
        }
    }
    

    您可以使用 EventWaihandle 或 AutoResetEvent,但它们都是封装的 Win32 API。由于 Monitor 类是纯托管代码,因此在这种情况下我更喜欢 Monitor。

    【讨论】:

    • 我同意 Henk 的观点。您的方法是一种相对简单的方法,可以确保您只有所需的工作量,尽管我会显式地初始化您的 RunningWorkers 变量,而不是通过全局方式。基本思想是合理的,只有当所有都完成时你才应该脉搏,而不是每次都完成。有关在 C# 中执行此操作的更多方法,请参阅 albahari.com/threading。您可能更喜欢其中的一种方式。
    • 使用 Interlocked 类而不是 ++,--.
    • modosansreves: 没必要,RunningWorkers 受 workerLocker 上的锁保护。
    【解决方案3】:

    您可能想看看 AutoResetEvent 和 ManualResetEvent。

    这些正是针对这种情况的(在执行“某事”之前等待 ThreadPool 线程完成)。

    你会这样做:

    static void Main(string[] args)
    {
        List<ManualResetEvent> resetEvents = new List<ManualResetEvent>();
        foreach (var x in Enumerable.Range(1, WORKER_COUNT))
        {
            ManualResetEvent resetEvent = new ManualResetEvent();
            ThreadPool.QueueUserWorkItem(DoSomething, resetEvent);
            resetEvents.Add(resetEvent);
        }
    
        // wait for all ManualResetEvents
        WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-)
    }
    
    public static void DoSomething(object data)
    {
        ManualResetEvent resetEvent = data as ManualResetEvent;
    
        // Do something
    
        resetEvent.Set();
    }
    

    编辑:忘了提你可以等待一个线程,任何线程等等。 此外,根据您的情况,AutoResetEvent 可以稍微简化一些事情,因为它(顾名思义)可以自动发出事件信号:-)

    【讨论】:

    • 这行不通,因为您在 Windows 上拆分了超过 64 个项目,只允许您等待最多 64 个句柄。
    • 谢谢肖恩,我不知道这一点(这对我当前的项目很重要)
    • 您忘记将resetEvent 附加到resetEvents
    • 盲人 - 不知道你的意思?肖恩-谢谢,我也不知道,但是是的,这可能非常重要。您是否碰巧知道它是否仅在 32 位 Windows 中是一个问题?
    • 我不知道为什么人们不投票给 Marc 的“Fork”课程。没有上面代码64个句柄的限制,非常干净。
    【解决方案4】:

    当我不得不等待任务完成时,我真的很喜欢Begin- End- Async Pattern

    我建议您将 BeginEnd 包装在工人类中:

    public class StringWorker
    {
        private string m_someString;
        private IAsyncResult m_result;
    
        private Action DoSomethingDelegate;
    
        public StringWorker(string someString)
        {
            DoSomethingDelegate = DoSomething;
        }
    
        private void DoSomething()
        {
            throw new NotImplementedException();
        }
    
        public IAsyncResult BeginDoSomething()
        {
            if (m_result != null) { throw new InvalidOperationException(); }
            m_result = DoSomethingDelegate.BeginInvoke(null, null);
            return m_result;
        }
    
        public void EndDoSomething()
        {
            DoSomethingDelegate.EndInvoke(m_result);
        }
    }
    

    要开始和工作,请使用以下代码 sn-p:

    List<StringWorker> workers = new List<StringWorker>();
    
    foreach (var someString in arrayStrings)
    {
        StringWorker worker = new StringWorker(someString);
        worker.BeginDoSomething();
        workers.Add(worker);
    }
    
    foreach (var worker in workers)
    {
        worker.EndDoSomething();
    }
    
    Console.WriteLine("END");
    

    就是这样。

    旁注:如果您想从 BeginEnd 返回结果,请将“Action”更改为 Func,并将 EndDoSomething 更改为返回类型。

    public class StringWorker
    {
        private string m_someString;
        private IAsyncResult m_result;
    
        private Func<string> DoSomethingDelegate;
    
        public StringWorker(string someString)
        {
            DoSomethingDelegate = DoSomething;
        }
    
        private string DoSomething()
        {
            throw new NotImplementedException();
        }
    
        public IAsyncResult BeginDoSomething()
        {
            if (m_result != null) { throw new InvalidOperationException(); }
            m_result = DoSomethingDelegate.BeginInvoke(null, null);
            return m_result;
        }
    
        public string EndDoSomething()
        {
            return DoSomethingDelegate.EndInvoke(m_result);
        }
    }
    

    【讨论】:

      【解决方案5】:

      只使用MonitorForkJoin 怎么样;-p

      Forker p = new Forker();
      foreach (var obj in collection)
      {
          var tmp = obj;
          p.Fork(delegate { DoSomeWork(tmp); });
      }
      p.Join();
      

      earlier answer 上显示的完整代码。

      或者对于具有上限大小(线程安全等)的生产者/消费者队列,here

      【讨论】:

      • 我真的很喜欢这个特别的想法。我正在将它合并到我的迷你线程库中。谢谢!
      【解决方案6】:

      是的,有。

      建议的方法

      1) 一个计数器和一个等待句柄

      int ActiveCount = 1; // 1 (!) is important
      EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);
      

      2) 添加循环

      foreach (string someString in arrayStrings)
      {
           Interlocked.Increment(ref ActiveCount);
      
           ThreadPool.QueueUserWorkItem(this.DoSomething, someString);
           // Thread.Sleep(100); // you really need this sleep ?
      }
      
      PostActionCheck();
      ewhAllDone.Wait();
      

      3) DoSomething 应该是这样的

      {
          try
          {
              // some long executing code
          }
          finally
          {
              // ....
              PostActionCheck();
          }
      } 
      

      4) PostActionCheck 在哪里

      void PostActionCheck()
      {
          if (Interlocked.Decrement(ref ActiveCount) == 0)
              ewhAllDone.Set();
      }
      

      想法

      ActiveCount 用1 初始化,然后递增n 次。

      PostActionCheck 被称为n + 1 次。最后一个会触发事件。

      此解决方案的好处是使用单个内核对象(它是一个事件)和轻量级 API 的 2 * n + 1 调用。 (可以少一点吗?)

      附言

      我在这里写了代码,我可能拼错了一些类名。

      【讨论】:

      • 这里的好处是您只需要 1 个 EWH,但 Monitor 仍然更可取。 EWH 的开销可以轻松抵消互锁的少量节省。
      【解决方案7】:

      除了 Henk Holterman 指出的 Barrier(顺便说一句,他对 Barrier 的使用非常不好,请参阅我对他的回答的评论),.NET 4.0 提供了一大堆其他选项(到在 .NET 3.5 中使用它们,您需要下载 an extra DLL from Microsoft)。我写了一篇博文that lists them all,但我最喜欢的绝对是Parallel.ForEach:

      Parallel.ForEach<string>(arrayStrings, someString =>
      {
          DoSomething(someString);
      });
      

      在幕后,Parallel.ForEach 排队到新的和改进的线程池,并等待所有线程完成。

      【讨论】:

        【解决方案8】:

        使用 Spring 线程。它内置了 Barrier 实现。

        http://www.springsource.org/extensions/se-threading-net

        【讨论】:

          猜你喜欢
          • 2012-03-23
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2010-12-08
          • 2011-01-01
          • 2013-08-10
          • 2020-12-11
          • 2019-12-08
          相关资源
          最近更新 更多