【问题标题】:Queue for multiple producers and consumers多个生产者和消费者的队列
【发布时间】:2013-04-17 22:15:52
【问题描述】:

我必须在 .Net3.5 中编写多个线程(目前至少 3 个)之间的通信,并且每个线程都是生产者和消费者。 我的想法不是在每对线程之间发送信号,而是实现一个消息队列,它将像这样存储类:

enum Signals { ObjectArrivedOnLightBarrier, ObjectLeftLightBarrier, CodeFound };
enum UnitID { GrabThread, ImageProcessingThread, SaveThread };

// Consumer shows who the message is intended for (and only that unit is allowed to remove it from the queue)
public class QueuedSignal
{
    Signals Message;
    UnitID Producer;
    UnitID Consumer;
}

这个想法是,任何线程都可以查看队列中的第一个项目,如果该消息不是针对它的,则不理会它(如果其他消息很少并且其中一个可能针对这个线程)。

当有多个生产者和消费者时队列线程安全。

【问题讨论】:

    标签: c# multithreading .net-3.5 queue producer-consumer


    【解决方案1】:

    Queue<T> 不是线程安全的。

    如果您使用的是 .Net 4 或更高版本,我建议您使用 BlockingCollection<T>

    很遗憾,您不能使用它,但是并发队列有几种实现方式。

    看看 Marc Gravel 的回答中的this one。不幸的是,它没有 Peek() 方法。

    这是我在 .Net 4 出现之前使用的一个类;也许您可能对此感兴趣。这不是最好的实现;在 .Net 4 出现之前,我们更多地将其用作占位符。即便如此,还是这样:

    /// <summary>A bounded blocking queue.</summary>
    /// <typeparam name="T">The element type of the queue, which must be a reference type.</typeparam>
    
    public sealed class BoundedBlockingQueue<T>: IDisposable where T: class
    {
        #region Construction and disposal
    
        /// <summary>Constructor.</summary>
        /// <param name="maxQueueSize">
        /// The maximum size of the queue.
        /// Calls to <see cref="Enqueue"/> when the queue is full will block until at least one item has been removed.
        /// Calls to <see cref="Dequeue"/> when the queue is empty will block until a new item is enqueued.
        /// </param>
    
        public BoundedBlockingQueue(int maxQueueSize)
        {
            if (maxQueueSize <= 0)
            {
                throw new ArgumentOutOfRangeException("maxQueueSize");
            }
    
            _queue                  = new Queue<T>(maxQueueSize);
            _itemsAvailable         = new Semaphore(0, maxQueueSize);
            _spaceAvailable         = new Semaphore(maxQueueSize, maxQueueSize);
            _queueStopped           = new ManualResetEvent(false);
            _queueStoppedAndEmpty   = new ManualResetEvent(false);
            _stoppedOrItemAvailable = new WaitHandle[] { _queueStopped, _itemsAvailable };
        }
    
        /// <summary>Disposal.</summary>
    
        public void Dispose()
        {
            if (_itemsAvailable != null)
            {
                _itemsAvailable.Close();
                _spaceAvailable.Close();
                _queueStopped.Close();
                _queueStoppedAndEmpty.Close();
                _itemsAvailable = null;          // Use _itemsAvailable as a flag to indicate that Dispose() has been called.
            }
        }
    
        #endregion Construction and disposal
    
        #region Public properties
    
        /// <summary>The number of items currently in the queue.</summary>
    
        public int Count
        {
            get
            {
                throwIfDisposed();
    
                lock (_queue)
                {
                    return _queue.Count;
                }
            }
        }
    
        /// <summary>Has <see cref="Stop"/> been called?</summary>
    
        public bool Stopped
        {
            get
            {
                throwIfDisposed();
                return _stopped;
            }
        }
    
        #endregion Public properties
    
        #region Public methods
    
        /// <summary>
        /// Signals that new items will no longer be placed into the queue.
        /// After this is called, calls to <see cref="Dequeue"/> will return null immediately if the queue is empty.
        /// Before this is called, calls to <see cref="Dequeue"/> will block if the queue is empty.
        /// Attempting to enqueue items after this has been called will cause an exception to be thrown.
        /// </summary>
        /// <remarks>
        /// If you use a different thread to enqueue items than the thread that calls Stop() you might get a race condition.
        /// 
        /// If the queue is full and a thread calls Enqueue(), that thread will block until space becomes available in the queue.
        /// If a different thread then calls Stop() while the other thread is blocked in Enqueue(), the item enqueued by the other
        /// thread may become lost since it will be enqueued after the special null value used to indiciate the end of the
        /// stream is enqueued.
        /// 
        /// To prevent this from happening, you must enqueue from the same thread that calls Stop(), or provide another
        /// synchronisation mechanism to avoid this race condition.
        /// </remarks>
    
        public void Stop()
        {
            throwIfDisposed();
    
            lock (_queue)
            {
                _queueStopped.Set();
                _stopped = true;
            }
        }
    
        /// <summary>
        /// Returns the front item of the queue without removing it, or null if the queue is currently empty.
        /// A null return does NOT indicate that <see cref="Stop"/> has been called.
        /// This never blocks.
        /// </summary>
        /// <returns>The front item of the queue, or null if the queue is empty.</returns>
    
        public T Peek()
        {
            throwIfDisposed();
            T result;
    
            lock (_queue)
            {
                if (_queue.Count > 0)
                {
                    result = _queue.Peek();
                }
                else
                {
                    result = null;
                }
            }
    
            return result;
        }
    
        /// <summary>
        /// Enqueues a new non-null item.
        /// If there is no room in the queue, this will block until there is room.
        /// An exception will be thrown if <see cref="Stop"/> has been called.
        /// </summary>
        /// <param name="item">The item to be enqueued. This may not be null.</param>
    
        public void Enqueue(T item)
        {
            throwIfDisposed();
    
            if (item == null)
            {
                throw new ArgumentNullException("item");
            }
    
            if (_stopped)
            {
                throw new InvalidOperationException("Attempting to enqueue an item to a stopped queue.");
            }
    
            this.enqueue(item);
        }
    
        /// <summary>
        /// Dequeues the next available item.
        /// If <see cref="Stop"/> has been called, this returns null if the queue is empty.
        /// Otherwise it blocks until an item becomes available (or <see cref="Stop"/> is called).
        /// </summary>
        /// <returns>The next available item, or null if the queue is empty and stopped.</returns>
    
        public T Dequeue()
        {
            throwIfDisposed();
    
            if (_isQueueStoppedAndEmpty)
            {
                return null;
            }
    
            WaitHandle.WaitAny(_stoppedOrItemAvailable);
    
            lock (_queue)
            {
                if (_stopped && (_queue.Count == 0))
                {
                    _isQueueStoppedAndEmpty = true;
                    _queueStoppedAndEmpty.Set();
                    return null;
                }
                else
                {
                    T item = _queue.Dequeue();
                    _spaceAvailable.Release();
                    return item;
                }
            }
        }
    
        /// <summary>Waits forever for the queue to become empty AND stopped.</summary>
    
        public void WaitUntilStoppedAndEmpty()
        {
            throwIfDisposed();
            WaitUntilStoppedAndEmpty(Timeout.Infinite);
        }
    
        /// <summary>Waits up to the specified time for the queue to become empty AND stopped.</summary>
        /// <param name="timeoutMilliseconds">The maximum wait time, in milliseconds.</param>
        /// <returns>True if the wait succeeded, false if it timed-out.</returns>
    
        public bool WaitUntilStoppedAndEmpty(int timeoutMilliseconds)
        {
            throwIfDisposed();
            return _queueStoppedAndEmpty.WaitOne(timeoutMilliseconds);
        }
    
        #endregion Public methods
    
        #region Private methods
    
        /// <summary>Enqueues a new item (which may be null to indicate the last item to go into the queue).</summary>
        /// <param name="item">An item to enqueue.</param>
    
        private void enqueue(T item)
        {
            _spaceAvailable.WaitOne();
    
            lock (_queue)
            {
                _queue.Enqueue(item);
            }
    
            _itemsAvailable.Release();
        }
    
        /// <summary>Throws if this object has been disposed.</summary>
    
        private void throwIfDisposed()
        {
            if (_itemsAvailable == null)
            {
                throw new ObjectDisposedException(this.GetType().FullName);
            }
        }
    
        #endregion Private methods
    
        #region Fields
    
        /// <summary>
        /// Contains wait handles for "stopped" and "item available".
        /// Therefore using this for WaitAny() will wait until the queue is stopped
        /// or an item becomes available, whichever is the sooner.
        /// </summary>
    
        private readonly WaitHandle[] _stoppedOrItemAvailable;
    
        private Semaphore _itemsAvailable;
    
        private volatile bool _stopped;
        private volatile bool _isQueueStoppedAndEmpty;
    
        private readonly Queue<T> _queue;
        private readonly Semaphore _spaceAvailable;
        private readonly ManualResetEvent _queueStopped;
        private readonly ManualResetEvent _queueStoppedAndEmpty;
    
        #endregion Fields
    }
    

    这是一个旧的单元测试。这不是一个很好的单元测试;它一次测试了太多东西并且还有一些其他问题,但它会演示如何使用队列:

    [TestMethod]
    
    public void TestBoundedBlockingQueue()
    {
        int maxQueueSize = 8;
    
        using (var queue = new BoundedBlockingQueue<string>(maxQueueSize))
        {
            // Fill the queue, but don't block.
    
            for (int i = 0; i < maxQueueSize; ++i)
            {
                int start1 = DateTimeFunctions.TickCount;
                queue.Enqueue(i.ToString());
                int elapsed1 = DateTimeFunctions.TickCount - start1;
                Assert.IsTrue(elapsed1 < 100, "Took too long to enqueue an item.");  // Shouldn't have taken more than 100 ms to enqueue the item.
            }
    
            // Now if we try to enqueue something we should block (since the queue should be full).
            // We can detect this by starting a thread that will dequeue something in a few seconds
            // and then seeing how long the main thread took to enqueue something.
            // It should have taken around the thread sleep time (+/- half a second or so).
    
            int sleepTime = 2500;
            int tolerance = 500;
            Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Dequeue();}, "TestBoundedBlockingQueue Dequeue()");
            int start2 = DateTimeFunctions.TickCount;
            queue.Enqueue(maxQueueSize.ToString());
            int elapsed2 = DateTimeFunctions.TickCount - start2;
            Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item.");
    
            // Now verify that the remaining items in the queue are the expected ones,
            // i.e. from "1".."maxQueueSize" (since the first item, "0", has been dequeued).
    
            for (int i = 1; i <= maxQueueSize; ++i)
            {
                Assert.AreEqual(i.ToString(), queue.Dequeue(), "Incorrect item dequeued.");
            }
    
            Assert.AreEqual(0, queue.Count);
    
            // Now if we try to dequeue something we should block (since the queue is empty).
            // We can detect this by starting a thread that will enqueue something in 5 seconds
            // and then seeing how long the main thread took to dequeue something.
            // It should have taken around 5 seconds (+/- half a second or so).
    
            string testValue = "TEST";
            Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Enqueue(testValue);}, "TestBoundedBlockingQueue Enqueue()");
            start2 = DateTimeFunctions.TickCount;
            Assert.AreEqual(testValue, queue.Dequeue(), "Incorrect item dequeued");
            elapsed2 = DateTimeFunctions.TickCount - start2;
            Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item.");
        }
    }
    

    【讨论】:

      【解决方案2】:

      June 2008 CTP of the Parallel Extensions for .NET 包含一个 BlockingCollection&lt;T&gt; 类,它可以满足您的需求。尽管它可能没有Peek 方法。该库与 .NET 3.5 一起使用。我用过很多次。

      我一直找不到下载它的地方,但你可以搜索一下。

      可能在反应式扩展中可用。较新版本的 Rx 适用于 .NET 4.5,但在 http://www.microsoft.com/en-us/download/details.aspx?id=28568 上有一个较旧版本可用

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-01-28
        • 1970-01-01
        • 2011-02-11
        • 1970-01-01
        相关资源
        最近更新 更多