【问题标题】:Parallel.For vs regular threadsParallel.For 与常规线程
【发布时间】:2012-10-15 17:45:37
【问题描述】:

我试图理解为什么 Parallel.For 在以下场景中能够胜过多个线程:考虑可以并行处理的一批作业。在处理这些工作时,可能会添加新工作,然后也需要处理这些工作。 Parallel.For 解决方案如下所示:

var jobs = new List<Job> { firstJob };
int startIdx = 0, endIdx = jobs.Count;
while (startIdx < endIdx) {
  Parallel.For(startIdx, endIdx, i => WorkJob(jobs[i]));
  startIdx = endIdx; endIdx = jobs.Count;
}

这意味着 Parallel.For 需要多次同步。考虑一个面包优先的图算法算法;同步的数量会很大。浪费时间,不是吗?

在老式线程方法中尝试相同的方法:

var queue = new ConcurrentQueue<Job> { firstJob };
var threads = new List<Thread>();
var waitHandle = new AutoResetEvent(false);
int numBusy = 0;
for (int i = 0; i < maxThreads; i++) 
  threads.Add(new Thread(new ThreadStart(delegate {
    while (!queue.IsEmpty || numBusy > 0) {
      if (queue.IsEmpty)
        // numbusy > 0 implies more data may arrive
        waitHandle.WaitOne();

      Job job;
      if (queue.TryDequeue(out job)) {
        Interlocked.Increment(ref numBusy);
        WorkJob(job); // WorkJob does a waitHandle.Set() when more work was found
        Interlocked.Decrement(ref numBusy);
      }
    }
    // others are possibly waiting for us to enable more work which won't happen
    waitHandle.Set(); 
})));
threads.ForEach(t => t.Start());
threads.ForEach(t => t.Join());

Parallel.For 代码当然更干净,但我无法理解的是,它也更快!任务调度器就这么好吗?同步被消除了,没有忙碌的等待,但线程方法始终较慢(对我而言)。这是怎么回事?线程方法可以更快吗?

编辑:感谢所有答案,我希望我可以选择多个。我选择了那个也显示出实际可能改进的那个。

【问题讨论】:

  • 如果已经有更干净、更快速的解决方案,为什么还要尝试让它更快?
  • 因为有一个明显的缺陷可以消除,我认为。
  • 您确实意识到代码的两个 sn-ps 就像白天和黑夜一样不同,对吧?一方面,您有一个静态的、已知数量的列表作业(在 startIdx 和 endIdx 之间)上的并行循环,另一方面,您有一组线程与队列和等待事件竞争,有几个互锁的传播介于两者之间。换句话说,线程代码是cache invalidation galore
  • @Remus:谢谢,又一个好收获。

标签: c# multithreading performance


【解决方案1】:

这两个代码示例并不完全相同。

Parallel.ForEach() 将使用有限数量的线程并重复使用它们。第二个示例已经开始落后,因为必须创建多个线程。这需要时间。

maxThreads 的值是多少?非常关键,Parallel.ForEach() 是动态的。

任务调度器就这么好吗?

还不错。 TPL 使用工作窃取和其他自适应技术。你将很难做得更好。

【讨论】:

  • 线程示例也重用了它创建的线程。它开始的数量有限,如果你是这个意思,不是每个工作都有一个。
  • 打败我,使用线程池与不使用。 stackoverflow.com/questions/230003/thread-vs-threadpool
  • @Henk:我想我会把它设置在 2 到 10 之间。
【解决方案2】:

Parallel.For 实际上并没有将项目分解为单个工作单元。它根据计划使用的线程数和要执行的迭代次数分解所有工作(早期)。然后让每个线程同步处理该批次(可能使用工作窃取或保存一些额外的项目以在接近尾声时进行负载平衡)。通过使用这种方法,工作线程实际上从不相互等待,而由于您在每次迭代之前/之后使用的大量同步,您的线程会不断地相互等待。

最重要的是,由于它使用线程池线程,它需要的许多线程可能已经创建,这是它的另一个优势。

至于同步,Parallel.For 的全部意义在于所有迭代都可以并行完成,因此几乎不需要进行同步(至少在他们的代码中)。

当然还有线程数的问题。线程池有很多非常好的算法和启发式方法来帮助它根据当前硬件、来自其他应用程序的负载等确定在那个时刻需要多少线程。这可能是您使用的线程过多或不足。

另外,由于在开始之前不知道您拥有的项目数量,我建议使用Parallel.ForEach 而不是几个Parallel.For 循环。它只是为您所处的情况而设计的,因此它的启发式方法会更好地应用。 (这也使得代码更加简洁。)

BlockingCollection<Job> queue = new BlockingCollection<Job>();

//add jobs to queue, possibly in another thread
//call queue.CompleteAdding() when there are no more jobs to run

Parallel.ForEach(queue.GetConsumingEnumerable(),
    job => job.DoWork());

【讨论】:

  • 实际上这种方法似乎是不可能的,因为你不知道什么时候打电话给queue.CompleteAdding()。这仅适用于队列为空且没有人在处理任何其他项目时。
  • @FrankRazenberg 不。当没有更多要添加的项目时,您只需致电 CompleteAdding。您无需等待它为空或没有更多项目正在处理中。 BlockingCollection 已经处理好了。 CompleteAdding 只是意味着枚举器不会再​​向其内部集合添加任何项目,因此当它添加时,最终会吐出它应该中断的最后一个,而不是阻塞并等待更多项目。
  • 但是您如何知道何时/何地调用 CompleteAdding()?它只能调用一次,对吧?
  • @FrankRazenberg 当您没有更多要添加的项目时,您将调用CompleteAdding。似乎您对工作的处理增加了更多工作;如果是这种情况,那么我建议尽可能将其重构为生产者和消费者。有一个线程/任务,它只创建 Job 项目来处理并将它们添加到阻塞集合中,然后我展示的代码可以对每个作业进行实际处理。
  • 只有在所有工作都完成处理后,我们才能知道是否没有更多工作(即我的线程生成方法的 while 子句)。我想这是不可能的:(
【解决方案3】:

您创建了一堆新线程,而 Parallel.For 正在使用线程池。如果您使用 C# 线程池,您会看到更好的性能,但这样做确实没有意义。

我会回避推出您自己的解决方案;如果有需要自定义的特殊情况,请使用 TPL 并自定义..

【讨论】:

    猜你喜欢
    • 2015-11-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-11
    • 1970-01-01
    • 1970-01-01
    • 2013-09-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多