【问题标题】:Processing a Queue in Parallel并行处理队列
【发布时间】:2012-07-04 12:02:38
【问题描述】:

我有一个处理消息的类:

public abstract class ProcessingBase {
    public bool IsBusy { get; set; }
    public Queue<CustomMessage> PendingMessages { get; private set; }
    public abstract MessageProcessingResult Process();
    ...

到目前为止,在我创建的处理模块中,一个使用队列来处理消息,通过套接字发送消息,另一个发送电子邮件。这适用于单个处理模块,但假设我想链接这两个?

我在想我可以这样做:

public class ChainProcessor : ProcessingBase {
    public List<ProcessingBase> Processors { get; set; }
    public override MessageProcessingResult Process() {
                if (IsBusy)
                    return null;
                IsBusy = true;

                CustomMessage msg = null;
                this.ProcessedMessages = new List<CustomMessage>();

                // create clone of queue 
                var messagesToSend = new Queue<CustomMessage>(this.PendingMessages);
                this.PendingMessages.Clear();
                while (messagesToSend.Count > 0 && (msg = messagesToSend.Dequeue()) != null) {
                    foreach (var processor in this.Processors) {
                        // something with yield return?
                    }
                }

这是让我动摇的实际链接。理想情况下,我想以一种“波浪”的方式处理它们。例如:

Module 1 - Processed message A
Module 2 - Processed message A
Module 1 - Processed message B
Module 2 - Processed message B
Module 3 - Processed message A
Module 1 - Processed message C

每条消息都通过链传递,消息在传递过程中不断进出。这样做的最佳方法是什么?或者我是否仅限于通过整个链顺序传递每条消息?

即(我不想要的): 模块 1 - 处理的消息 A 模块 2 - 处理的消息 A 模块 3 - 处理的消息 A 模块 1 - 处理的消息 B 模块 2 - 处理的消息 B 模块 3 - 处理的消息 B 模块 1 - 处理的消息 C

编辑:我希望我可以做一些事情,让第一个处理器产生返回给“链处理器”,这会将消息传递到模块 2,然后链可能会启动处理器 1 上的下一条消息

【问题讨论】:

  • 我真的不介意什么线程,重要的是它们不是按顺序处理的(问题已更新)。

标签: c# .net parallel-processing queue chaining


【解决方案1】:

我认为你可以使用一个

   BlockingCollection<CustomMessage>

每个模块。然后,您可以创建多个线程以从每个集合中读取并执行模块的处理工作。之后将其放入下一个模块的队列中。

通过创建多个线程,您可以并行工作。 BlockingCollection 是线程安全的、快速的和阻塞的。很方便。见http://msdn.microsoft.com/en-us/library/dd267312.aspx

【讨论】:

    【解决方案2】:

    您正在寻找线程安全ConcurrentQueueAutoResetEvent,女巫可以唤醒休眠线程(参见示例)。

    每个工作线程都在自己的线程上运行(如果您对主线程有一些工作),在完成工作并清空输入队列后,调用AutoResetEvent.WaitOne()。如果输入队列被某个项目填满,请调用AutoResetEvent.Set(),它会再次启动该过程。

    您可以通过使用第一个工作人员处理的项目创建第二个队列来链接这两个工作人员。在第一个工作人员处理完消息后,将项目添加到输出队列并调用AutoResetEvent.Set()

    三思而后行,我不了解您的情况,因此您可以根据需要优化流程。

    祝你好运。 ;-)

    【讨论】:

      猜你喜欢
      • 2013-02-16
      • 2012-01-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-23
      • 1970-01-01
      相关资源
      最近更新 更多