【问题标题】:How to implement Barrier class from .NET 4 functionality in .NET 3.5如何在 .NET 3.5 中从 .NET 4 功能实现 Barrier 类
【发布时间】:2011-10-16 22:02:57
【问题描述】:

出于某些原因,我必须坚持使用 .NET 3.5,并且我需要 .NET 4 中的 Barrier 类的功能。我有一堆线程在做一些工作,我希望它们互相等待,直到全部完成.当一切都完成后,我希望他们以类似的方式一次又一次地完成这项工作。 在Difference between Barrier in C# 4.0 and WaitHandle in C# 3.0? 线程的鼓励下,我决定使用 AutoResetEvent 和 WaitHandle 类来实现 Barrier 功能。 尽管我的代码遇到了问题:

class Program
{
    const int numOfThreads = 3;

    static AutoResetEvent[] barrier = new AutoResetEvent[numOfThreads];
    static Random random = new Random(System.DateTime.Now.Millisecond);

    static void barriers2(object barrierObj)
    {
        AutoResetEvent[] barrierLocal = (AutoResetEvent[])barrierObj;
        string name = Thread.CurrentThread.Name;
        for (int i = 0; i < 10; i++)
        {
            int sleepTime = random.Next(2000, 10000);
            System.Console.Out.WriteLine("Thread {0} at the 'barrier' will sleep for {1}.", name, sleepTime);
            Thread.Sleep(sleepTime);
            System.Console.Out.WriteLine("Thread {0} at the 'barrier' with time {1}.", name, sleepTime);
            int currentId = Convert.ToInt32(name);
            //for(int z = 0; z < numOfThreads; z++)
                barrierLocal[currentId].Set();
            WaitHandle.WaitAll(barrier);
            /*
            for (int k = 0; k < numOfThreads; k++)
            {
                if (k == currentId)
                {
                    continue;
                }
                System.Console.Out.WriteLine("Thread {0} is about to wait for the singla from thread: {1}", name, k);
                barrierLocal[k].WaitOne();
                System.Console.Out.WriteLine("Thread {0} is about to wait for the singla from thread: {1}. done", name, k);
            }
            */
        }
    }

    static void Main(string[] args)
    {
        for (int i = 0; i < numOfThreads; i++)
        {
            barrier[i] = new AutoResetEvent(false);
        }
        for (int i = 0; i < numOfThreads; i++)
        {
            Thread t = new Thread(Program.barriers2);
            t.Name = Convert.ToString(i);
            t.Start(barrier);
        }
    }
}

我收到的输出如下:

“屏障”处的线程 0 将休眠 7564 'barrier' 处的线程 1 将休眠 5123 'barrier' 处的线程 2 将休眠 4237 线程 2 在 'barrier' 与时间 4237 线程 1 在 'barrier' 与时间 5123 线程 0 在“障碍”处,时间为 7564 'barrier' 处的线程 0 将休眠 8641 线程 0 在 'barrier' 处,时间为 8641

就是这样。在最后一行之后没有更多的输出并且应用程序不会终止。看起来有某种僵局。但是找不到问题。欢迎任何帮助。

谢谢!

【问题讨论】:

    标签: c# multithreading barrier cyclicbarrier


    【解决方案1】:

    那是因为您使用了 AutoResetEvent。线程的 WaitAll() 调用之一将首先完成。这会自动导致所有 ARE 上的 Reset()。这会阻止其他线程完成他们的 WaitAll() 调用。

    此处需要 ManualResetEvent。

    【讨论】:

    • 汉斯,谢谢你的回答。这解释了问题,但在 ManualResetEvent 类的情况下,必须有一些线程在所有线程都通过“障碍”后重置 MRE。知道如何在我的场景中完成吗?
    • 从阅读 Barrier 源代码中获得一些灵感,比如 Reflector。我认为它使用两组 MRE(分别命名为奇数和偶数)并在它们之间交替。
    • 你怎么看这样一个简单的解决方案:
       lock (obj) { threadsCount++; if (threadsCount == numOfThreads) { System.Console.WriteLine("所有线程都完成了。");线程数 = 0; Monitor.PulseAll(obj); } else { Monitor.Wait(obj); } }
    【解决方案2】:

    下载 .NET 3.5 的 Reactive Extensions 反向端口。您会发现 Barrier 类以及 .NET 4.0 中发布的其他有用的并发数据结构和同步机制。

    【讨论】:

    • 嗨,感谢您的回答。是否有可能从这个 backport 获取 Barrier 类,或者我总是必须引用整个包?谢谢。
    • 问题是,它是为了一个大项目的目的,不能强迫许多其他人在生产环境中安装扩展+。
    • 您不必在目标机器上安装 RX 框架。事实上,如果您不想,您不必将它安装在您的开发机器上。只需抓住它附带的 System.Threading.dll 库并像引用任何其他 3rd 方库一样引用它。我就是这么做的。
    【解决方案3】:

    这是我用于 XNA game 的实现。当我写这篇文章时,障碍不可用,我仍然坚持使用 .Net 3.5。它需要三组 ManualResetEvents,和一个计数器数组来保持相位。

    using System;
    using System.Threading;
    
    namespace Colin.Threading
    {
        /// <summary>
        /// Threading primitive for "barrier" sync, where N threads must stop at certain points 
        /// and wait for all their bretheren before continuing.
        /// </summary>
        public sealed class NThreadGate
        {
            public int mNumThreads;
            private ManualResetEvent[] mEventsA;
            private ManualResetEvent[] mEventsB;
            private ManualResetEvent[] mEventsC;
            private ManualResetEvent[] mEventsBootStrap;
            private Object mLockObject;
            private int[] mCounter;
            private int mCurrentThreadIndex = 0;
    
            public NThreadGate(int numThreads)
            {
                this.mNumThreads = numThreads;
    
                this.mEventsA = new ManualResetEvent[this.mNumThreads];
                this.mEventsB = new ManualResetEvent[this.mNumThreads];
                this.mEventsC = new ManualResetEvent[this.mNumThreads];
                this.mEventsBootStrap = new ManualResetEvent[this.mNumThreads];
                this.mCounter = new int[this.mNumThreads];
                this.mLockObject = new Object();
    
                for (int i = 0; i < this.mNumThreads; i++)
                {
                    this.mEventsA[i] = new ManualResetEvent(false);
                    this.mEventsB[i] = new ManualResetEvent(false);
                    this.mEventsC[i] = new ManualResetEvent(false);
                    this.mEventsBootStrap[i] = new ManualResetEvent(false);
                    this.mCounter[i] = 0;
                }
            }
    
            /// <summary>
            /// Adds a new thread to the gate system.
            /// </summary>
            /// <returns>Returns a thread ID for this thread, to be used later when waiting.</returns>
            public int AddThread()
            {
                lock (this.mLockObject)
                {
                    this.mEventsBootStrap[this.mCurrentThreadIndex].Set();
                    this.mCurrentThreadIndex++;
                    return this.mCurrentThreadIndex - 1;
                }
            }
    
            /// <summary>
            /// Stop here and wait for all the other threads in the NThreadGate. When all the threads have arrived at this call, they
            /// will unblock and continue.
            /// </summary>
            /// <param name="myThreadID">The thread ID of the caller</param>
            public void WaitForOtherThreads(int myThreadID)
            {
                // Make sure all the threads are ready.
                WaitHandle.WaitAll(this.mEventsBootStrap);
    
                // Rotate between three phases.
                int phase = this.mCounter[myThreadID];
                if (phase == 0)        // Flip
                {
                    this.mEventsA[myThreadID].Set();
                    WaitHandle.WaitAll(this.mEventsA);
                    this.mEventsC[myThreadID].Reset();
                }
                else if (phase == 1)    // Flop
                {
                    this.mEventsB[myThreadID].Set();
                    WaitHandle.WaitAll(this.mEventsB);
                    this.mEventsA[myThreadID].Reset();
                }
                else    // Floop
                {
                    this.mEventsC[myThreadID].Set();
                    WaitHandle.WaitAll(this.mEventsC);
                    this.mEventsB[myThreadID].Reset();
                    this.mCounter[myThreadID] = 0;
                    return;
                }
    
                this.mCounter[myThreadID]++;
            }
        }
    }
    

    设置线程门:

    private void SetupThreads()
    {
        // Make an NThreadGate for N threads.
        this.mMyThreadGate = new NThreadGate(Environment.ProcessorCount);
    
        // Make some threads...
        // e.g. new Thread(new ThreadStart(this.DoWork);
    }
    

    线程工作者方法:

    private void DoWork()
    {
        int localThreadID = this.mMyThreadGate.AddThread();
    
        while (this.WeAreStillRunning)
        {
            // Signal this thread as waiting at the barrier
            this.mMyThreadGate.WaitForOtherThreads(localThreadID);
    
            // Synchronized work here...
    
            // Signal this thread as waiting at the barrier
            this.mMyThreadGate.WaitForOtherThreads(localThreadID);
    
            // Synchronized work here...
    
            // Signal this thread as waiting at the barrier
            this.mMyThreadGate.WaitForOtherThreads(localThreadID);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2013-04-10
      • 1970-01-01
      • 2010-09-19
      • 2011-02-09
      • 1970-01-01
      • 2011-09-01
      • 1970-01-01
      • 1970-01-01
      • 2011-02-15
      相关资源
      最近更新 更多