【问题标题】:Multithread queue of jobs作业的多线程队列
【发布时间】:2016-09-19 22:21:02
【问题描述】:

我有一个可以由多个线程填充的作业队列 (ConcurrentQueue<MyJob>)。我需要异步(不是通过主线程)实现这些作业的连续执行,但只能由一个线程同时执行。我尝试过这样的事情:

public class ConcurrentLoop {
    private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>();

    private static Task _currentTask;
    private static object _lock = new object();

    public static void QueueJob(Job job)
    {
        _concurrentQueue.Enqueue(job);
        checkLoop();
    }

    private static void checkLoop()
    {
        if ( _currentTask == null || _currentTask.IsCompleted )
        {
            lock (_lock)
            {
                if ( _currentTask == null || _currentTask.IsCompleted )
                {
                    _currentTask = Task.Run(() =>
                    {
                            MyJob current;
                            while( _concurrentQueue.TryDequeue( out current ) ) 
                                //Do something                                                       
                    });
                }
            }
        }
    }
}

我认为这段代码有一个问题:如果任务完成执行(TryDequeue 返回 false,但任务尚未被标记为已完成)并且此时我得到一个新工作,它将不会被执行。我对吗?如果是这样,如何解决这个问题

【问题讨论】:

  • 您尝试限制线程数是否有真正的原因?最好让框架大部分时间为你处理优化
  • 对要由一个线程一次执行的代码块使用锁定。这应该可以解决问题。
  • 锁定ConcurrentQueue?我相信有更好的方法
  • @konkked,是的。
  • @xalz ...好吧,那是什么?

标签: c# multithreading task-parallel-library concurrent-collections


【解决方案1】:

您的问题陈述看起来像 producer-consumer 问题,但需要注意的是您只需要一个消费者。

无需手动重新实现此类功能。 相反,我建议使用BlockingCollection - 在内部它使用ConcurrentQueue 和一个单独的线程用于消费。 请注意,这可能适合也可能不适合您的用例。

类似:

_blockingCollection = new BlockingCollection<your type>(); // you may want to create bounded or unbounded collection
_consumingThread = new Thread(() =>
{
    foreach (var workItem in _blockingCollection.GetConsumingEnumerable()) // blocks when there is no more work to do, continues whenever a new item is added.
    {
      // do work with workItem
     }
});
_consumingThread.Start();

多个生产者(任务或线程)可以将工作项添加到_blockingCollection 没有问题,并且无需担心同步生产者/消费者。

当你完成生产任务后,调用_blockingCollection.CompleteAdding()(这个方法不是线程安全的,所以建议提前停止所有生产者)。 可能,您还应该在某处执行 _consumingThread.Join() 以终止您的消费线程。

【讨论】:

    【解决方案2】:

    我会为此使用 Microsoft 的 Reactive Framework Team 的 Reactive Extensions (NuGet "System.Reactive")。这是一个可爱的抽象。

    public class ConcurrentLoop
    {
        private static Subject<MyJob> _jobs = new Subject<MyJob>();
    
        private static IDisposable _subscription =
            _jobs
                .Synchronize()
                .ObserveOn(Scheduler.Default)
                .Subscribe(job =>
                {
                    //Do something
                });
    
        public static void QueueJob(MyJob job)
        {
            _jobs.OnNext(job);
        }
    }
    

    这很好地将所有传入的作业同步到一个流中并将执行推送到Scheduler.Default(基本上是线程池),但是因为它已经序列化了所有输入,一次只能发生一个。这样做的好处是,如果值之间存在显着差距,它会释放线程。这是一个非常精简的解决方案。

    要清理,您只需致电_jobs.OnCompleted();_subscription.Dispose();

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2010-10-08
      • 1970-01-01
      • 2013-01-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-29
      • 1970-01-01
      相关资源
      最近更新 更多