【问题标题】:Thread-safe task queue await线程安全的任务队列等待
【发布时间】:2019-11-18 09:58:09
【问题描述】:

我想做一件简单的事情(假设ContinueWiththread-safe):

readonly Task _task = Task.CompletedTask;

// thread A: conditionally prolong the task
if(condition)
    _task.ContinueWith(o => ...);

// thread B: await for everything
await _task;

问题:在上面的代码中await _task立即返回,忽略是否有inner任务。

_task 可以延长 多个 ContinueWith 的扩展要求,我将如何等待所有这些都完成?


当然我可以尝试用旧的线程安全方式来做:

Task _task = Task.CompletedTask;
readonly object _lock = new object();

// thread A
if(condition)
    _lock(_lock)
        _task = _task.ContinueWith(o => ...); // storing the latest inner task

// thread B
lock(_lock)
    await _task;

或者通过将任务存储在线程安全集合中并使用Task.WaitAll

但我很好奇第一个 sn-p 是否有简单的修复方法?

【问题讨论】:

  • 您必须等待调用ContinueWith() 的任务返回,因此您必须执行_task = _task.ContinueWith(o => ...); 之类的操作(您不需要添加顺便说一句,锁定分配引用 - 如果这就是你正在做的所有事情,那是一个原子操作。)
  • 您可以在ContinueWith调用中指定TaskCreationOptions.AttachedToParent,以使父任务仅在子任务完成时完成。
  • 您可能希望使用TaskContinuationOptions.ExecuteSynchronously 参数运行Continuation。
  • 当分配给_task 时,您绝对确实需要一个锁,因为_task 是从一个线程写入并从另一个线程读取的。否则,线程 B 可能会在线程 A 重新分配它之前读取 _task 字段的值,并且即使在线程 A 重新分配它之后也可能缓存该值。
  • 如果可能,使用await 的方式更简洁:async Task Prolong() { if (condition) await ...; } 然后_task = Prolong()await _task

标签: c# multithreading async-await task continuewith


【解决方案1】:

你的旧版本没问题,除了await同时保持锁;您应该在持有锁的同时将 _task 复制到局部变量中,然后释放锁,然后再将 await

但 ContinueWith 工作流程不是 IMO 实现该逻辑的最佳方式。在 async-await 成为语言的一部分之前,ContinueWith 是在 .NET 4.0 中引入的。

在现代 C# 中,我认为最好这样做:

static async Task runTasks()
{
    if( condition1 )
        await handle1();
    if( condition2 )
        await handle2();
}
readonly Task _task = runTasks();

或者如果您希望它们并行运行:

IEnumerable<Task> parallelTasks()
{
    if( condition1 )
        yield return handle1();
    if( condition2 )
        yield return handle2();
}
readonly Task _task = Task.WhenAll( parallelTasks() );

附:如果条件是动态变化的,对于顺序版本,您应该在第一次等待之前将它们的值复制到局部变量。

更新:所以你是说你的线程 A 延长了该任务以响应一些动态发生的事件?如果是,您的旧方法还可以,简单且有效,只需修复 lock(){ await ... } 并发错误即可。

理论上,更简洁的方法可能类似于反应器模式,但不幸的是它要复杂得多。这是我的open source example 有点类似的东西。您不需要并发限制信号量,也不需要第二个队列,但是您需要公开一个 Task 类型的属性,使用 CompletedTask 对其进行初始化,并在发布第一个时用 post 方法中的TaskCompletionSource&lt;bool&gt; 创建的新任务替换,并在没有更多发布的任务时在 runAsyncProcessor 方法中完成该任务。

【讨论】:

  • 用专门的方法组织逻辑确实是聪明的设计(+1)。但在这种情况下,我必须向类添加更多成员才能评估单个方法中的条件。我最初的想法是直接在 A 中更新 _task(无需存储 _wasAOk 或其他东西)以便稍后仅在 B 中使用 await 来检查 _task
【解决方案2】:

在您的原始代码中,await _task; 立即返回,因为 task 已完成。你需要做的

_task = _task.ContinueWith(...);

您不必使用锁。您正在同一个线程中工作。在代码的“锁定”版本中,您确实使用了_task = _task.ContinueWith(...),这就是它起作用的原因,而不是因为锁定。


编辑:刚刚看到您想在一个线程中执行 _task = _task.ContinueWith(...);_task = _task.ContinueWith(...)

这在设计上是相当糟糕的。你永远不应该将线程和任务结合起来。您应该选择仅任务解决方案或仅线程解决方案。

如果您选择仅任务解决方案(推荐),只需创建一个良好的工作任务序列,然后在每个任务上根据前一个任务的结果决定如何进行。

如果您选择纯线程解决方案,您可能希望使用ManualResetEvent 句柄来同步您的任务。

【讨论】:

  • Op 不是“在同一个线程中工作”——_tasks 是从线程 A 写入,从线程 B 读取的
  • 第一段是对cmets的总结,我可以投赞成票。但我不同意第二段的每一句话。对“从不结合线程和任务” 有任何引用吗?顺便说一句,当我说另一个线程时,我并不是指专门创建线程,它可能是一个 UI 事件添加一些东西或另一个任务。
  • @Sinatr,您有权不同意,但这并不意味着您是正确的。 “从不结合线程和任务” - 任务可以在任何线程上继续,如果没有同步上下文,即使await 可以在另一个线程上继续。你现在开始明白我的意思了吗?同样,可靠的方法是构建适当的任务序列,而不是制作怪物意大利面。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-05
相关资源
最近更新 更多