【问题标题】:How can I modify a queue collection in a loop?如何在循环中修改队列集合?
【发布时间】:2010-02-06 19:42:09
【问题描述】:

我有一个场景,我需要在处理完队列后立即删除一个项目。 我知道我无法在循环中从集合中删除一个项目,但想知道是否有什么 可以用枚举器等来完成......

这只是一个引发错误的基本示例 "枚举器实例化后集合被修改。"

有什么建议吗?非常感谢!!!

代码如下:

     class Program
        {
            static void Main()
            {

                Queue<Order> queueList = GetQueueList();

                foreach (Order orderItem in queueList)
                {
                    Save(orderItem);
                    Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
                    queueList.Dequeue();
                }
                Console.Read();

            }

            private static void Save(Order orderItem)
            {
               //we are pretending to save or do something.
            }

            private static Queue<Order>GetQueueList()
            {
                Queue<Order> orderQueue = new Queue<Order>();
                orderQueue.Enqueue(new Order { Id = 1, Name = "Order 1" });
                orderQueue.Enqueue(new Order { Id = 1, Name = "Order 2" });
                orderQueue.Enqueue(new Order { Id = 2, Name = "Order 3" });
                orderQueue.Enqueue(new Order { Id = 3, Name = "Order 4" });
                orderQueue.Enqueue(new Order { Id = 4, Name = "Order 5" });
                return orderQueue;
            }
        }

        public  class Order
        {
            public int Id { get; set; }
            public string Name { get; set; }
        }

【问题讨论】:

    标签: c#


    【解决方案1】:

    将你的 foreach 更改为:

    while (queueList.Count > 0)
    {
        Order orderItem = queueList.Dequeue();
        Save(orderItem);
        Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
    }
    

    编辑:

    如果保存失败,要重新处理,请执行以下操作:

    while (queueList.Count > 0)
    {
        Order orderItem = queueList.Dequeue();
    
        if (!Save(orderItem))
        {
            queueList.Enqueue(orderItem); // Reprocess the failed save, probably want more logic to prevent infinite loop
        }
        else
        {
            Console.WriteLine("Successfully saved: {0} Name {1} ", orderItem.Id, orderItem.Name);
        }
    }
    

    编辑:

    John K 提到线程安全,如果您有多个线程访问同一个Queue,这是一个有效的问题。请参阅 http://ccutilities.codeplex.com/SourceControl/changeset/view/40529#678487 了解涵盖简单线程安全问题的 ThreadSafeQueue 类。


    编辑:这是我一直向大家指出的线程安全示例 :-)

    这里是一个提到的线程安全问题的例子。如图所示,默认的Queue 可以“错过”项目,同时仍然减少计数。

    更新:为了更好地表达问题。我从不向Queue 添加空项,但标准Queue.Dequeue() 返回几个空值。仅此一项就可以了,但是这样做会从内部集合中删除一个有效项目,并且Count 会减少。在此特定示例中,可以安全地假设从 Queue.Dequeue() 操作返回的每个 null 项目代表一个从未处理过的有效项目。

    using System;
    using System.Collections.Generic;
    using System.Threading;
    
    namespace SO_ThreadSafeQueue
    {
        class Program
        {
            static int _QueueExceptions;
            static int _QueueNull;
            static int _QueueProcessed;
    
            static int _ThreadSafeQueueExceptions;
            static int _ThreadSafeQueueNull;
            static int _ThreadSafeQueueProcessed;
    
            static readonly Queue<Guid?> _Queue = new Queue<Guid?>();
            static readonly ThreadSafeQueue<Guid?> _ThreadSafeQueue = new ThreadSafeQueue<Guid?>();
            static readonly Random _Random = new Random();
    
            const int Expected = 10000000;
    
            static void Main()
            {
                Console.Clear();
                Console.SetCursorPosition(0, 0);
                Console.WriteLine("Creating queues...");
    
                for (int i = 0; i < Expected; i++)
                {
                    Guid guid = Guid.NewGuid();
                    _Queue.Enqueue(guid);
                    _ThreadSafeQueue.Enqueue(guid);
                }
    
                Console.SetCursorPosition(0, 0);
                Console.WriteLine("Processing queues...");
    
                for (int i = 0; i < 100; i++)
                {
                    ThreadPool.QueueUserWorkItem(ProcessQueue);
                    ThreadPool.QueueUserWorkItem(ProcessThreadSafeQueue);
                }
    
                int progress = 0;
    
                while (_Queue.Count > 0 || _ThreadSafeQueue.Count > 0)
                {
                    Console.SetCursorPosition(0, 0);
    
                    switch (progress)
                    {
                        case 0:
                            {
                                Console.WriteLine("Processing queues... |");
                                progress = 1;
                                break;
                            }
                        case 1:
                            {
                                Console.WriteLine("Processing queues... /");
                                progress = 2;
                                break;
                            }
                        case 2:
                            {
                                Console.WriteLine("Processing queues... -");
                                progress = 3;
                                break;
                            }
                        case 3:
                            {
                                Console.WriteLine("Processing queues... \\");
                                progress = 0;
                                break;
                            }
                    }
    
                    Thread.Sleep(200);
                }
    
                Console.SetCursorPosition(0, 0);
                Console.WriteLine("Finished processing queues...");
                Console.WriteLine("\r\nQueue Count:           {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _Queue.Count, _QueueProcessed, _QueueExceptions, _QueueNull);
                Console.WriteLine("ThreadSafeQueue Count: {0} Processed: {1, " + Expected.ToString().Length + "} Exceptions: {2,4} Null: {3}", _ThreadSafeQueue.Count, _ThreadSafeQueueProcessed, _ThreadSafeQueueExceptions, _ThreadSafeQueueNull);
    
                Console.WriteLine("\r\nPress any key...");
                Console.ReadKey();
            }
    
            static void ProcessQueue(object nothing)
            {
                while (_Queue.Count > 0)
                {
                    Guid? currentItem = null;
    
                    try
                    {
                        currentItem = _Queue.Dequeue();
                    }
                    catch (Exception)
                    {
                        Interlocked.Increment(ref _QueueExceptions);
                    }
    
                    if (currentItem != null)
                    {
                        Interlocked.Increment(ref _QueueProcessed);
                    }
                    else
                    {
                        Interlocked.Increment(ref _QueueNull);
                    }
    
                    Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times
                }
            }
    
            static void ProcessThreadSafeQueue(object nothing)
            {
                while (_ThreadSafeQueue.Count > 0)
                {
                    Guid? currentItem = null;
    
                    try
                    {
                        currentItem = _ThreadSafeQueue.Dequeue();
                    }
                    catch (Exception)
                    {
                        Interlocked.Increment(ref _ThreadSafeQueueExceptions);
                    }
    
                    if (currentItem != null)
                    {
                        Interlocked.Increment(ref _ThreadSafeQueueProcessed);
                    }
                    else
                    {
                        Interlocked.Increment(ref _ThreadSafeQueueNull);
                    }
    
                    Thread.Sleep(_Random.Next(1, 10)); // Simulate different workload times
                }
            }
    
            /// <summary>
            /// Represents a thread safe <see cref="Queue{T}"/>
            /// </summary>
            /// <typeparam name="T"></typeparam>
            public class ThreadSafeQueue<T> : Queue<T>
            {
                #region Private Fields
                private readonly object _LockObject = new object();
                #endregion
    
                #region Public Properties
                /// <summary>
                /// Gets the number of elements contained in the <see cref="ThreadSafeQueue{T}"/>
                /// </summary>
                public new int Count
                {
                    get
                    {
                        int returnValue;
    
                        lock (_LockObject)
                        {
                            returnValue = base.Count;
                        }
    
                        return returnValue;
                    }
                }
                #endregion
    
                #region Public Methods
                /// <summary>
                /// Removes all objects from the <see cref="ThreadSafeQueue{T}"/>
                /// </summary>
                public new void Clear()
                {
                    lock (_LockObject)
                    {
                        base.Clear();
                    }
                }
    
                /// <summary>
                /// Removes and returns the object at the beggining of the <see cref="ThreadSafeQueue{T}"/>
                /// </summary>
                /// <returns></returns>
                public new T Dequeue()
                {
                    T returnValue;
    
                    lock (_LockObject)
                    {
                        returnValue = base.Dequeue();
                    }
    
                    return returnValue;
                }
    
                /// <summary>
                /// Adds an object to the end of the <see cref="ThreadSafeQueue{T}"/>
                /// </summary>
                /// <param name="item">The object to add to the <see cref="ThreadSafeQueue{T}"/></param>
                public new void Enqueue(T item)
                {
                    lock (_LockObject)
                    {
                        base.Enqueue(item);
                    }
                }
    
                /// <summary>
                /// Set the capacity to the actual number of elements in the <see cref="ThreadSafeQueue{T}"/>, if that number is less than 90 percent of current capactity.
                /// </summary>
                public new void TrimExcess()
                {
                    lock (_LockObject)
                    {
                        base.TrimExcess();
                    }
                }
                #endregion
            }
    
        }
    }
    

    【讨论】:

    • 嗨,这行得通。如果我只想在保存成功的情况下将项目出列怎么办。我还能这样做吗?对不起+谢谢不熟悉队列
    • @devnet247:不是真的。如果您不将顶部的项目出列,则无法获得其后的项目。您需要像此示例一样将失败的项目移动到队列的 tail
    • 非常感谢您的帮助。现在就可以了。我必须实现全线程,这个过程发生在 wcf 服务中。要学习的另一件事。再次感谢
    • 还有 Queue.Peek 允许您从队列中获取项目而无需出队。如果一切顺利,您可以稍后将其出列。
    • @m3rLinEz:好点。这在单线程应用程序中效果很好,但我更喜欢 Dequeue()/Enqueue() on failure 模式,因此它可以扩展到多个线程,而无需担心两个线程处理同一项目。
    【解决方案2】:

    foreach 作为一种合理的方式来遍历队列当您不删除项目时

    当你想删除和处理项目时,线程安全、正确的方法就是删除它们 一次一个,并在它们被删除后处理它们。

    一种方法是这样的

    // the non-thread safe way
    //
    while (queueList.Count > 0)
    {
        Order orderItem = queueList.Dequeue();
        Save(orderItem);
        Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
    }
    

    队列中的项目数可能会在 queueList.Count 之间发生变化 和queueList.Dequeue(),所以为了线程安全,你只需要使用Dequeue,但是当队列为空时Dequeue会抛出,所以你必须使用异常处理程序。

    // the thread safe way.
    //
    while (true)
    {
        Order orderItem = NULL;
        try { orderItem = queueList.Dequeue(); } catch { break; }
        if (null != OrderItem)
        {
            Save(orderItem);
            Console.WriteLine("Id :{0} Name {1} ", orderItem.Id, orderItem.Name);
        }
    }
    

    【讨论】:

    • 围绕 catch 的尝试不会解决线程安全问题,因为许多线程访问同一个队列时,Dequeue() 可能会在实际从内部集合中删除一个项目时返回一个空对象。请参阅ccutilities.codeplex.com/SourceControl/changeset/view/… 以获得更好的选择。
    • @John:不只是经验。我最近编写了一个应用程序,它产生了 100 个线程来处理来自单个 Queue 的文件,并注意到即使我的 Queue.Count 归零,其他“已处理”计数器并没有与初始 Queue.Count 相加。我添加到ThreadSafeQueue 的简单锁提供了一致的结果,无论我在同一个Queue 中抛出多少线程
    • 谢谢你们的帮助!!现在会看那篇文章
    • @Cory:很高兴知道。我发现.net 队列类不是线程安全的,但很明显他们没有考虑线程,因为出队抛出然后队列为空。
    • @John:我编辑了我的答案以提供我所说的行为示例。
    【解决方案3】:

    在我看来,您似乎正在尝试一个接一个地处理队列中的元素。

    如何将其包装在while 循环中并处理来自 Dequeue 的每个元素,直到队列为空?

    【讨论】:

    • 在生产代码中,我们有一个要处理的订单队列,在每个订单之后我都需要将它们出列。你能用一点sn-p告诉我你的意思吗?谢谢
    猜你喜欢
    • 1970-01-01
    • 2013-10-09
    • 1970-01-01
    • 2014-09-19
    • 1970-01-01
    • 2012-05-17
    • 2010-11-10
    • 1970-01-01
    • 2011-09-11
    相关资源
    最近更新 更多