【问题标题】:Process queue with multithreading or tasks具有多线程或任务的进程队列
【发布时间】:2014-05-06 11:44:31
【问题描述】:

我有一个电话消息应用程序,其中有很多消息要处理。由于电话端口有限,所以消息将被处理先进先出。每条消息都有一个“确认”标志,指示处理的是哪个消息。它当然被初始化为假。

我想将所有消息放入一个队列,然后用多个线程或任务处理它们。

    public class MessageQueue
    {
        public Queue MessageWorkItem { get; set; }
        public Messages Message { get; set; }
        public MessageQueue()
        {
            MessageWorkItem = new Queue();
            Message = new Messages();
        }
        public void GetMessageMetaData()
        {
            try
            {
                // It is just a test, add only one item into the queue
                Message.MessageID = Guid.NewGuid();
                Message.NumberToCall = "1111111111";
                Message.FacilityID = "3333";
                Message.NumberToDial = "2222222222";
                Message.CountryCode = "1";
                Message.Acknowledge = false;
            }
            catch (Exception ex)
            {
            }
        }

        public void AddingItemToQueue()
        {
            GetMessageMetaData();
            if (!Message.Acknowledge)
            {
                lock (MessageWorkItem)
                {
                    MessageWorkItem.Enqueue(Message);
                }
            }
        }
    }

    public class Messages
    {
        public Guid MessageID { get; set; }
        public string NumberToCall { get; set; }
        public string FacilityID { get; set; }
        public string NumberToDial { get; set; }
        public string CountryCode { get; set; }
        public bool Acknowledge { get; set; }
    }

现在我的问题是如何使用多线程从队列中取出项目。 对于队列中的每一项,我都想运行一个脚本。

        public void RunScript(Message item)
        {
            try
            {
                PlayMessage(item); 
                return;
            }
            catch (HangupException hex)
            {
                Log.WriteWithId("Caller Hungup!", hex.Message);
            }
            catch (Exception ex)
            {
                Log.WriteException(ex, "Unexpected exception: {0}");
            }
        }

我的想法是看看

if(MessageWorkItem.Count >= 1) 然后做点什么,但我确实需要代码帮助。

【问题讨论】:

  • 查看生产者/消费者模式(问题)并彻底重组你的类。注意命名。
  • 你正在使用哪个版本的框架
  • @@K.B、.NET 4 或 .NET 4.5。哪个都好。

标签: c# multithreading task-parallel-library


【解决方案1】:

Parallel.ForEach 来自TPL。它是并行的。

示例(将 MessageWorkItem 更改为通用队列):

    public class MessageQueue
{
    public Queue<Message> MessageWorkItem { get; set; }

    public MessageQueue()
    {
        MessageWorkItem = new Queue<Message>();
    }

    public Message GetMessageMetaData()
    {
        try
        {
            // It is just a test, add only one item into the queue
            return new Message()
            {
                MessageID = Guid.NewGuid(),
                NumberToCall = "1111111111",
                FacilityID = "3333",
                NumberToDial = "2222222222",
                CountryCode = "1",
                Acknowledge = false
            };
        }
        catch (Exception ex)
        {
            return null;
        }
    }

    public void AddingItemToQueue()
    {
        var message = GetMessageMetaData();
        if (!message.Acknowledge)
        {
            lock (MessageWorkItem)
            {
                MessageWorkItem.Enqueue(message);
            }
        }
    }
}

public class Message
{
    public Guid MessageID { get; set; }
    public string NumberToCall { get; set; }
    public string FacilityID { get; set; }
    public string NumberToDial { get; set; }
    public string CountryCode { get; set; }
    public bool Acknowledge { get; set; }
}

class Program
{
    static void Main(string[] args)
    {
        MessageQueue me = new MessageQueue();
        for (int i = 0; i < 10000; i++)
            me.AddingItemToQueue();

        Console.WriteLine(me.MessageWorkItem.Count);

        Parallel.ForEach(me.MessageWorkItem, RunScript);
    }

    static void RunScript(Message item)
    {
        // todo: ...
        Console.WriteLine(item.MessageID);
        Thread.Sleep(300);
    }
}

【讨论】:

  • 如果消费者继续工作并将内容添加到队列中,这将不起作用
  • 您还在迭代队列时将项目添加到队列中,这将引发异常,并且还会受到各种竞争条件的影响,因为它甚至是从不同的线程完成的. Queue 根本不是为同时从多个线程访问而构建的。
  • @Servy 对于“在迭代队列时将项目添加到队列”问题,您是 100% 正确的,但我想对您的第二个陈述作出澄清。 Parallel.ForEach 不会使用默认分区程序对IEnumerable&lt;T&gt; 进行多线程访问,它会使用单个线程将一堆项目拉入内部队列,然后将该工作分发给多个工作线程。但我再次强调 Servy 是正确的,即从单个线程读取并使用一个或多个其他线程写入对于 Queue 的枚举器是不可行的。
【解决方案2】:

如果您可以使用 .Net 4.5,我建议您查看 Dataflow from the the Task Parallel Library (TPL)

该页面提供了很多示例演练,例如 How to: Implement a Producer-Consumer Dataflow PatternWalkthrough: Using Dataflow in a Windows Forms Application

查看该文档,看看它是否对您有帮助。需要考虑的内容很多,但我认为这可能是您最好的方法。

或者,您可以考虑使用BlockingCollection 及其GetConsumingEnumerable() 方法来访问队列中的项目。

您所做的是将工作拆分为您希望以某种方式处理的对象,并使用 BlockingCollection 来管理队列。

一些使用ints而不是对象作为工作项的示例代码将有助于证明这一点:

当工作线程完成其当前项时,它将从工作队列中删除一个新项,处理该项,然后将其添加到输出队列中。

一个单独的消费者线程从输出队列中移除已完成的项目并对它们进行处理。

最后,我们必须等待所有工作人员完成 (Task.WaitAll(workers)),然后才能将输出队列标记为已完成 (outputQueue.CompleteAdding())。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().run();
        }

        void run()
        {
            int threadCount = 4;
            Task[] workers = new Task[threadCount];

            Task.Factory.StartNew(consumer);

            for (int i = 0; i < threadCount; ++i)
            {
                int workerId = i;
                Task task = new Task(() => worker(workerId));
                workers[i] = task;
                task.Start();
            }

            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queueing work item {0}", i);
                inputQueue.Add(i);
                Thread.Sleep(50);
            }

            Console.WriteLine("Stopping adding.");
            inputQueue.CompleteAdding();
            Task.WaitAll(workers);
            outputQueue.CompleteAdding();
            Console.WriteLine("Done.");

            Console.ReadLine();
        }

        void worker(int workerId)
        {
            Console.WriteLine("Worker {0} is starting.", workerId);

            foreach (var workItem in inputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
                Thread.Sleep(100);          // Simulate work.
                outputQueue.Add(workItem);  // Output completed item.
            }

            Console.WriteLine("Worker {0} is stopping.", workerId);
        }

        void consumer()
        {
            Console.WriteLine("Consumer is starting.");

            foreach (var workItem in outputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumer is using item {0}", workItem);
                Thread.Sleep(25);
            }

            Console.WriteLine("Consumer is finished.");
        }

        BlockingCollection<int> inputQueue = new BlockingCollection<int>();
        BlockingCollection<int> outputQueue = new BlockingCollection<int>();
    }
}

【讨论】:

  • @@Mattew,我可以使用 .net 4.5。
  • @Love 啊,你的问题上有 .Net 4.0 标签(有人已经为你解决了这个问题,我注意到了!:)
  • @Love 我根据上述信息稍微编辑了我的答案。
  • --@Mattew,对于我的特殊情况,我应该将我的 RunScript() 放在哪里?我猜在 void worker() 中?
  • @Love 是的,那将是合适的地方。有效负载将是workItem,您可以使用任何类型来保存您想要作为工作单元处理的信息 - 在您的情况下可能是Message 的一个实例?在这种情况下,您将使用 BlockingCollection&lt;Message&gt;
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-10-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-05-16
相关资源
最近更新 更多