【问题标题】:Wait for pooled threads to complete等待池线程完成
【发布时间】:2010-10-16 19:15:57
【问题描述】:

对于一个多余的问题,我深表歉意。但是,我找到了许多解决问题的方法,但没有一个得到很好的解释。我希望它会在这里说清楚。

我的 C# 应用程序的主线程使用 ThreadPool 生成 1..n 个后台工作程序。我希望原始线程锁定,直到所有工作人员都完成。我特别研究了 ManualResetEvent,但我不清楚它的用途。

在伪中:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

如有必要,我会提前知道即将排队的工人数量。

【问题讨论】:

标签: c# .net multithreading


【解决方案1】:

首先,工人执行多长时间?池线程通常应该用于短期任务 - 如果它们要运行一段时间,请考虑手动线程。

重新解决问题;你真的需要阻塞主线程吗?你可以用回调代替吗?如果是这样,类似:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

这是一种相当轻量级(无操作系统原语)跟踪工作人员的方法。

如果您需要阻止,您可以使用Monitor 执行相同操作(同样,避免使用操作系统对象):

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");

【讨论】:

  • 如果其中一个代表抛出异常,您的代码将无限等待。
  • 如果其中一个代表抛出异常,我将失去整个过程,所以这是相当随意的......我假设它不会抛出,但我会做它明确的;-p
  • worker 将处理昂贵的操作,包括读取和写入文件以及执行涉及 Binary/Image 列的 SQL 选择和插入。它们的寿命不太可能需要显式线程,但可以通过让它们并行执行来获得性能。
  • +1,要处理工作进程中的异常,您可以尝试 { DoSomeWork(tmp); } 最后 { endOfThread(); }
  • @Marc,ThreadPool 异常是否会杀死进程并不确定。它在 CLR 的 1.0 和 2.0 版本之间发生了变化(我相信它也是可配置的)说实话我不记得哪个版本做了哪个了。我只是假设线程最糟糕:)
【解决方案2】:

试试这个。该函数接受一个动作委托列表。它将为列表中的每个项目添加一个 ThreadPool 工作人员条目。它会等待每个动作完成后再返回。

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}

【讨论】:

  • 如果句柄数大于系统允许的数量,WaitHandle.WaitAll 将失败。在我的 Win2k3 服务器上,这个数字是 64,所以当我尝试生成超过 64 个项目时出现异常......
  • @Eran,尝试编写一个 SpawAndWaitHelper,它基本上具有上述代码。使用 SpawAndWait 将 enumerable 分成 64 个大小的块,并为每个块调用 helper。
  • 这段代码存在严重错误。由于包装器操作是延迟评估的,因此执行包装器操作的第一个线程可能会获取 2nd 、 3rd 等句柄而不是第一个。
  • 我不明白这个严重的错误是如何发生的。据我了解,在分配 WrappedAction 时,currentAction 值已被捕获,保存在此 WrappedAction 下。它是懒惰地评估的,但值 WrappedAction/currentAction 无论如何都不会改变。
【解决方案3】:

我认为您在 ManualResetEvent 方面走在了正确的轨道上。这个link 有一个与您尝试做的非常匹配的代码示例。关键是使用 WaitHandle.WaitAll 并传递一个等待事件数组。每个线程都需要设置这些等待事件之一。

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

编辑:

确保在您的工作线程代码路径中设置了事件(即 autoEvents1.Set();)。一旦它们都收到信号,waitAll 就会返回。

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}

【讨论】:

    【解决方案4】:

    这是一种不同的方法 - 封装;所以你的代码可以很简单:

        Forker p = new Forker();
        foreach (var obj in collection)
        {
            var tmp = obj;
            p.Fork(delegate { DoSomeWork(tmp); });
        }
        p.Join();
    

    Forker 类在下面给出(我在火车上感到无聊;-p)...再次,这避免了 OS 对象,但将事情包装得很整齐(IMO):

    using System;
    using System.Threading;
    
    /// <summary>Event arguments representing the completion of a parallel action.</summary>
    public class ParallelEventArgs : EventArgs
    {
        private readonly object state;
        private readonly Exception exception;
        internal ParallelEventArgs(object state, Exception exception)
        {
            this.state = state;
            this.exception = exception;
        }
    
        /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
        public object State { get { return state; } }
    
        /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
        public Exception Exception { get { return exception; } }
    }
    
    /// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
    public sealed class Forker
    {
        int running;
        private readonly object joinLock = new object(), eventLock = new object();
    
        /// <summary>Raised when all operations have completed.</summary>
        public event EventHandler AllComplete
        {
            add { lock (eventLock) { allComplete += value; } }
            remove { lock (eventLock) { allComplete -= value; } }
        }
        private EventHandler allComplete;
        /// <summary>Raised when each operation completes.</summary>
        public event EventHandler<ParallelEventArgs> ItemComplete
        {
            add { lock (eventLock) { itemComplete += value; } }
            remove { lock (eventLock) { itemComplete -= value; } }
        }
        private EventHandler<ParallelEventArgs> itemComplete;
    
        private void OnItemComplete(object state, Exception exception)
        {
            EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
            if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
            if (Interlocked.Decrement(ref running) == 0)
            {
                EventHandler allHandler = allComplete; // don't need to lock
                if (allHandler != null) allHandler(this, EventArgs.Empty);
                lock (joinLock)
                {
                    Monitor.PulseAll(joinLock);
                }
            }
        }
    
        /// <summary>Adds a callback to invoke when each operation completes.</summary>
        /// <returns>Current instance (for fluent API).</returns>
        public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
        {
            if (handler == null) throw new ArgumentNullException("handler");
            ItemComplete += handler;
            return this;
        }
    
        /// <summary>Adds a callback to invoke when all operations are complete.</summary>
        /// <returns>Current instance (for fluent API).</returns>
        public Forker OnAllComplete(EventHandler handler)
        {
            if (handler == null) throw new ArgumentNullException("handler");
            AllComplete += handler;
            return this;
        }
    
        /// <summary>Waits for all operations to complete.</summary>
        public void Join()
        {
            Join(-1);
        }
    
        /// <summary>Waits (with timeout) for all operations to complete.</summary>
        /// <returns>Whether all operations had completed before the timeout.</returns>
        public bool Join(int millisecondsTimeout)
        {
            lock (joinLock)
            {
                if (CountRunning() == 0) return true;
                Thread.SpinWait(1); // try our luck...
                return (CountRunning() == 0) ||
                    Monitor.Wait(joinLock, millisecondsTimeout);
            }
        }
    
        /// <summary>Indicates the number of incomplete operations.</summary>
        /// <returns>The number of incomplete operations.</returns>
        public int CountRunning()
        {
            return Interlocked.CompareExchange(ref running, 0, 0);
        }
    
        /// <summary>Enqueues an operation.</summary>
        /// <param name="action">The operation to perform.</param>
        /// <returns>The current instance (for fluent API).</returns>
        public Forker Fork(ThreadStart action) { return Fork(action, null); }
    
        /// <summary>Enqueues an operation.</summary>
        /// <param name="action">The operation to perform.</param>
        /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
        /// <returns>The current instance (for fluent API).</returns>
        public Forker Fork(ThreadStart action, object state)
        {
            if (action == null) throw new ArgumentNullException("action");
            Interlocked.Increment(ref running);
            ThreadPool.QueueUserWorkItem(delegate
            {
                Exception exception = null;
                try { action(); }
                catch (Exception ex) { exception = ex;}
                OnItemComplete(state, exception);
            });
            return this;
        }
    }
    

    【讨论】:

    • (HI MARC!还记得这篇文章吗??)出于好奇,为什么需要 var tmp = obj?我通过传入我的对象来实现它,我得到了疯狂的结果。将其更改为使用 var 最终可以正常工作。我显然不明白的东西!谢谢,看看两年后你是否还记得:)
    • @user 答案有点复杂,但简而言之,这是因为 C# 未能在你甚至没有意识到的情况下悄悄地完成你的意思。它通常很擅长在所有正确的地方明确地做到这一点,但在这种情况下就不行了。
    • 您需要了解代码delegate { DoSomeWork(tmp); } 捕获变量tmp。每次对该代码的调用都会在循环中捕获一个不同的 变量,因为tmp 的范围仅限于循环体。然而,foreach 变量在每次循环中都是 same 变量,所以所有对delegate { DoSomeWork(tmp); } 的调用都会捕获相同的内容。这真的不需要这样;限制 foreach 变量的范围会使许多代码“正常工作”,而人们甚至没有意识到这种情况的棘手。
    • 非常感谢!!我用这个sourceforge.net/projects/icompress
    【解决方案5】:

    我在这里找到了一个很好的解决方案:

    http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

    对于遇到相同问题的其他人可能会派上用场

    【讨论】:

      【解决方案6】:

      使用 .NET 4.0 Barrier 类:

              Barrier sync = new Barrier(1);
      
              foreach(var o in collection)
              {
                  WaitCallback worker = (state) => 
                  {
                      // do work
                      sync.SignalAndWait();
                  };
      
                  sync.AddParticipant();
                  ThreadPool.QueueUserWorkItem(worker, o);
              }
      
              sync.SignalAndWait();
      

      【讨论】:

      • 参与人数有上限。 :(
      【解决方案7】:

      我一直在使用 CTP 中新的并行任务库here

             Parallel.ForEach(collection, o =>
                  {
                      DoSomeWork(o);
                  });
      

      【讨论】:

      • 好建议!在处理异常时也更容易。见:msdn.microsoft.com/en-us/library/dd991486.aspx
      • 要特别小心,因为它使用 ThreadPool 并且不能强制它使用专用(非托管)线程。即使使用带有 LongRunning 选项的底层 TaskFactory 也只能提供调度程序的提示,但不能保证专用线程。
      【解决方案8】:

      这是使用CountdownEvent 类的解决方案。

      var complete = new CountdownEvent(1);
      foreach (var o in collection)
      {
        var capture = o;
        ThreadPool.QueueUserWorkItem((state) =>
          {
            try
            {
              DoSomething(capture);
            }
            finally
            {
              complete.Signal();
            }
          }, null);
      }
      complete.Signal();
      complete.Wait();
      

      当然,如果您可以访问 CountdownEvent 类,那么您就可以使用整个 TPL。 Parallel 类负责等待你。

      Parallel.ForEach(collection, o =>
        {
          DoSomething(o);
        });
      

      【讨论】:

        【解决方案9】:

        尝试使用CountdownEvent

        // code before the threads start
        
        CountdownEvent countdown = new CountdownEvent(collection.Length);
        
        foreach (var o in collection)
        {
            ThreadPool.QueueUserWorkItem(delegate
            {
                // do something with the worker
                Console.WriteLine("Thread Done!");
                countdown.Signal();
            });
        }
        countdown.Wait();
        
        Console.WriteLine("Job Done!");
        
        // resume the code here
        

        倒计时将等到所有线程执行完毕。

        【讨论】:

        【解决方案10】:

        等待线程池中的所有线程完成,没有可用的内置方法。 使用计数号。的线程处于活动状态,我们可以实现它...

        {
                bool working = true;
                ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
                while (working)
                {
                    ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
                    //Console.WriteLine($"{workerThreads} , {maxWorkerThreads}");
                    if (workerThreads == maxWorkerThreads)
                    { working = false; }
                }
                //when all threads are completed then 'working' will be false 
            }
            void xyz(object o)
            {
                console.writeline("");
            }
        

        【讨论】:

          猜你喜欢
          • 2015-08-29
          • 1970-01-01
          • 2014-03-18
          • 2011-06-02
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多