【问题标题】:Let threads wait for all tasks to complete before starting on the next set of tasks让线程在开始下一组任务之前等待所有任务完成
【发布时间】:2020-08-23 12:16:13
【问题描述】:

我有一个由多个阶段组成的管道。同一阶段的作业可以并行处理。但是必须先完成第 1 阶段的所有工作,然后才能开始从事第 2 阶段的工作,等等。

我正在考虑使用 CountDownEvent 同步这项工作。

我的基本结构是

this.WorkerCountdownEvent = new CountdownEvent(MaxJobsInStage);
this.WorkerCountdownEvent.Signal(MaxJobsInStage); // Starts all threads
// Each thread runs the following code

for (this.currentStage = 0; this.currentStage < this.PipelineStages.Count; this.currentStage++)
{
    this.WorkerCountdownEvent.Wait();
    var stage = this.PipelineStages[this.currentStage];
    if (stage.Systems.Count < threadIndex)
    {
        var system = stage.Systems[threadIndex];
        system.Process();
    }

    this.WorkerCountdownEvent.Signal(); // <--

}

这适用于处理一个阶段。但是到达this.WorkerCountdownEvent.Signal() 的第一个线程将导致应用程序崩溃,因为它试图将信号减小到零以下。

当然,如果我想阻止这种情况,并且让工作再次等待,我必须致电this.WorkerCountdownEvent.Reset()。但是我必须在所有线程开始工作之后调用它,但在一个线程完成它的工作之前。这似乎是一项不可能完成的任务?

我是否使用了错误的同步原语?或者我应该使用两个倒计时事件?还是我完全错过了什么?

(顺便说一句,工作通常需要不到一毫秒的时间,所以如果有人有更好的方法来使用像 ManualResetEventSlim 这样的“苗条”原语来做到这一点,则可以加分。线程池或任务不是我正在寻找的方向,因为这些线程将持续很长时间(数小时),并且需要每秒通过管道 60 次。因此,停止/启动任务的开销在这里相当大)。

编辑:这个问题被标记为两个问题的重复。其中一个问题的答案是“使用 thread.Join()”,另一个问题是“使用 TPL”,这两个答案(在我看来)显然不是关于流水线和线程原语的问题的答案,例如 CountDownEvent .

【问题讨论】:

  • 如果你正在使用任务,为什么不直接使用Task.WhenAll。例如调用await Task.WhenAll(stage1Tasks),然后在完成后启动stage2任务。
  • 我建议从您的问题中删除“任务”一词的所有实例,因为您指的不是Task 对象,而Tasks 目前作为解决相关问题的工具非常流行异步和并行。因此,在不提及Tasks 的情况下谈论“任务”会造成混乱。
  • 我试图让它更清晰,并用删除所有提到的单词 task 替换免责声明,并解释为什么 Task 不合适。但不幸的是,在我看来,这个问题已经作为两个非常不同的问题的副本而被关闭了。
  • 我通过创建自己的CountDownEvent 版本解决了这个问题,不幸的是我无法发布答案......所以我希望这个链接能保持一段时间:github.com/roy-t/EntitySystemTest/blob/master/Project/Threading/…
  • 是的,我什至将它标记为版主注意,但没有结果。顺便说一句,这是一个很好的技巧:)。

标签: c# multithreading .net-core


【解决方案1】:

我认为最适合这种情况的同步原语是Barrier

使多个任务能够通过多个阶段并行协作处理算法。

使用示例:

private Barrier _barrier = new Barrier(this.WorkersCount);

// Each worker thread runs the following code
for (i = 0; i < this.StagesCount; i++)
{
    // Here goes the work of a single worker for a single stage...
    _barrier.SignalAndWait();
}

更新:如果您希望工作人员异步等待信号,可以使用 AsyncBarrier 实现 here

【讨论】:

    猜你喜欢
    • 2012-01-17
    • 1970-01-01
    • 2013-03-22
    • 1970-01-01
    • 1970-01-01
    • 2020-03-15
    • 2011-06-05
    • 2016-03-23
    • 1970-01-01
    相关资源
    最近更新 更多