【问题标题】:How can I guarantee continuations run in task completion order?我如何保证继续按任务完成顺序运行?
【发布时间】:2016-06-03 17:36:16
【问题描述】:

如果我的代码通过返回代表每个阶段的任务来抽象异步操作的分阶段序列,我如何确保继续按阶段顺序(即任务完成的顺序)执行?

请注意,这与简单的“不要浪费时间等待较慢的任务”不同。在调度中需要保证没有竞争条件的顺序。这个较宽松的要求可以通过以下问题的部分答案来解决:

  1. Sort Tasks into order of completition
  2. Is there default way to get first task that finished successfully?

认为合乎逻辑的解决方案是使用自定义 TaskScheduler(例如基于 SynchronizationContext 的)附加延续。但是,我无法保证调度在任务完成时同步执行。

在代码中,这可能类似于

class StagedOperationSource
{
    public TaskCompletionSource Connect = new TaskCompletionSource();
    public TaskCompletionSource Accept = new TaskCompletionSource();
    public TaskCompletionSource Complete = new TaskCompletionSource();
}
class StagedOperation
{
    public Task Connect, Accept, Complete;
    public StagedOperation(StagedOperationSource source)
    {
        Connect = source.Connect.Task;
        Accept = source.Accept.Task;
        Complete = source.Complete.Task;
    }
}
...
private StagedOperation InitiateStagedOperation(int opId)
{
    var source = new StagedOperationSource();
    Task.Run(GetRunnerFromOpId(opId, source));
    return new StagedOperation(source);
}
...
public RunOperations()
{
    for (int i=0; i<3; i++)
    {
        var op = InitiateStagedOperation(i);
        op.Connect.ContinueWith(t => Console.WriteLine("{0}: Connected", i));
        op.Accept.ContinueWith(t => Console.WriteLine("{0}: Accepted", i));
        op.Complete.ContinueWith(t => Console.WriteLine("{0}: Completed", i));
    }
}

应该产生类似于

的输出
0: Connected
1: Connected
0: Accepted
2: Connected
0: Completed
1: Accepted
2: Accepted
2: Completed
1: Completed

显然,该示例缺少详细信息,例如如果早期阶段失败,则将异常转发到(或取消)后期阶段,但这只是一个示例。

【问题讨论】:

  • 您可能想查看TPL Dataflow
  • 您可以使用像 waitone 这样的信号量或使用锁来确保一个任务在下一个任务开始之前完成。
  • @jdweng 那不会是异步的。为此,您必须为循环的每次迭代创建一个线程和信号量,从而使代码远远比它需要的更昂贵。
  • 不能完全异步运行,保证完成顺序。任务之间必须有一些同步,尤其是当您一遍又一遍地运行相同的任务时。你有一个状态机,你必须定义规则才能得到准确的结果。
  • @jdweng 我发布了一个完全异步运行的答案,并保证在您发布该评论前 12 分钟需要订购。除了 TPL 提供的同步之外,不需要同步以确保在作为其延续的任务完成后运行延续。

标签: c# asynchronous task-parallel-library task


【解决方案1】:

在进入下一个阶段之前,只需await...

public static async Task ProcessStagedOperation(StagedOperation operation, int i)
{
    await operation.Connect;
    Console.WriteLine("{0}: Connected", i);
    await operation.Accept;
    Console.WriteLine("{0}: Accepted", i);
    await operation.Complete;
    Console.WriteLine("{0}: Completed", i);
}

然后您可以在 for 循环中调用该方法。

【讨论】:

    【解决方案2】:

    如果您使用 TAP(任务异步编程),即 asyncawait,您可以使处理流程更加明显。在这种情况下,我将创建一个新方法来封装操作顺序:

    public async Task ProcessStagedOperation(StagedOperation op, int i)
    {
        await op.Connect;
        Console.WriteLine("{0}: Connected", i);
    
        await op.Accept;
        Console.WriteLine("{0}: Accepted", i)
    
        await op.Complete;
        Console.WriteLine("{0}: Completed", i)
    }
    

    现在你的处理循环被简化了一点:

    public async Task RunOperations()
    {
        List<Task> pendingOperations = new List<Task>();
    
        for (int i=0; i<3; i++)
        {
            var op = InitiateStagedOperation(i);
            pendingOperations.Add(ProcessStagedOperation(op, i));
        }
    
        await Task.WhenAll(pendingOperations); // finish
    }
    

    您现在有一个对任务对象的引用,您可以显式等待或简单地从另一个上下文中 await。 (或者你可以简单地忽略它)。我修改 RunOperations() 方法的方式允许您创建一个大型待处理任务队列,但在等待它们全部完成时不会阻塞。

    【讨论】:

    • 使用基于任务的编程仅仅意味着你正在使用任务。你不需要使用await 来做到这一点。 OP已经在使用 TPL。
    • 而 TAP 是建立在 TPL 之上的。您的回答也是使用await,所以我对是否需要评论感到困惑。
    • TAP只是基于使用任务的编程模型,TPL就是其中一种实现。就像我说的,基于任务的编程模型需要使用任务进行编程,它绝不需要使用await。我并不是说使用await 有什么问题,只是你说的“TAP 被定义为使用await”是错误的。您还建议 OP 开始使用 他已经在使用的模型
    • 我从未将 TAP定义为使用 await,但这些概念密切相关。它将帮助 OP 的 Google Fu 使用 TAP 或 async/await。无论如何,我们似乎是在这里从鼹鼠山中变成一座山。
    • for 循环是制作简化示例的产物,但您的 Task.WhenAll 对扩展实际提供的内容很有帮助。
    猜你喜欢
    • 1970-01-01
    • 2021-02-15
    • 2016-09-16
    • 1970-01-01
    • 1970-01-01
    • 2011-12-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多