【问题标题】:Sequential processing of asynchronous tasks异步任务的顺序处理
【发布时间】:2013-01-31 16:41:17
【问题描述】:

假设以下同步代码:

try
{
    Foo();
    Bar();
    Fubar();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}

现在假设所有这些方法都有一个 Async 对应方法,并且出于某种原因我必须使用它们,所以简单地将整个东西包装在一个新任务中不是一种选择。
我将如何实现相同的行为?
我所说的“相同”是:

  1. 如果抛出异常,则执行异常处理程序。
  2. 如果抛出异常,则停止执行以下方法。

我唯一能想到的是可怕

var fooTask = FooAsync();
fooTask.ContinueWith(t => HandleError(t.Exception),
                     TaskContinuationOptions.OnlyOnFaulted);
fooTask.ContinueWith(
    t =>
    {
        var barTask = BarAsync();
        barTask.ContinueWith(t => HandleError(t.Exception),
                             TaskContinuationOptions.OnlyOnFaulted);
        barTask.ContinueWith(
            t =>
            {
                var fubarTask = FubarAsync();
                fubarTask.ContinueWith(t => HandleError(t.Exception),
                                       TaskContinuationOptions.OnlyOnFaulted);
                fubarTask.ContinueWith(
                    t => Console.WriteLine("All done"),
                    TaskContinuationOptions.OnlyOnRanToCompletion);
            }, 
            TaskContinuationOptions.OnlyOnRanToCompletion);
    }, 
    TaskContinuationOptions.OnlyOnRanToCompletion);

请注意:

  • 我需要一个适用于.NET 4 的解决方案,所以async/await 是不可能的。但是,如果它可以与 async/await 一起使用,请随时展示如何。
  • 我不需要使用 TPL。如果 TPL 不可能,另一种方法也可以,也许使用 Reactive Extensions?

【问题讨论】:

  • 不认为有可能编写一个辅助方法,它接受一个params Action[] 参数以及一个完整/错误处理程序,它从中为您构建所有这些讨厌的东西?
  • @ChrisSinclair:好主意。如果没有“真正的”解决方案,这就是我要做的……
  • 您还需要为每个任务添加一个延续来处理Canceled,如果有任何任务是可取消的。
  • @Servy:假设它们不能被取消。
  • @ChrisSinclair 这不是Action[],而是Task[]。基本上是一个包含异常处理的ForEachAsync

标签: c# .net .net-4.0 task-parallel-library


【解决方案1】:

这是async 的工作方式:

try
{
    await FooAsync();
    await BarAsync();
    await FubarAsync();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}

如果您安装了(预发行版)Microsoft.Bcl.Async package,这将适用于 .NET 4.0。


由于你卡在 VS2010 上,你可以使用 Stephen Toub's Then 的变体:

public static Task Then(this Task first, Func<Task> next)
{
  var tcs = new TaskCompletionSource<object>();
  first.ContinueWith(_ =>
  {
    if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
    else if (first.IsCanceled) tcs.TrySetCanceled();
    else
    {
      try
      {
        next().ContinueWith(t =>
        {
          if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
          else if (t.IsCanceled) tcs.TrySetCanceled();
          else tcs.TrySetResult(null);
        }, TaskContinuationOptions.ExecuteSynchronously);
      }
      catch (Exception exc) { tcs.TrySetException(exc); }
    }
  }, TaskContinuationOptions.ExecuteSynchronously);
  return tcs.Task; 
}

你可以这样使用它:

var task = FooAsync().Then(() => BarAsync()).Then(() => FubarAsync());
task.ContinueWith(t =>
{
  if (t.IsFaulted || t.IsCanceled)
  {
    var e = t.Exception.InnerException;
    // exception handling
  }
  else
  {
    Console.WriteLine("All done");
  }
}, TaskContinuationOptions.ExcecuteSynchronously);

使用 Rx,它看起来像这样(假设您没有已经公开为 IObservable&lt;Unit&gt;async 方法):

FooAsync().ToObservable()
    .SelectMany(_ => BarAsync().ToObservable())
    .SelectMany(_ => FubarAsync().ToObservable())
    .Subscribe(_ => { Console.WriteLine("All done"); },
        e => { Console.WriteLine(e); });

我想。无论如何,我不是 Rx 大师。 :)

【讨论】:

  • @StephenCleary:关于 .NET 4 的替代方式有什么想法吗?
  • @DanielHilgarth:Stephen Toub 有一个blog post,这在这里会很有帮助;他实现了一个Then 扩展方法,以实现更自然的异步方法顺序链接。
  • 在Then的实现中,有一个未定义的变量t。
【解决方案2】:

为了完整起见,这就是我将如何实现 Chris Sinclair 建议的辅助方法:

public void RunSequential(Action onComplete, Action<Exception> errorHandler,
                          params Func<Task>[] actions)
{
    RunSequential(onComplete, errorHandler,
                  actions.AsEnumerable().GetEnumerator());
}

public void RunSequential(Action onComplete, Action<Exception> errorHandler,
                          IEnumerator<Func<Task>> actions)
{
    if(!actions.MoveNext())
    {
        onComplete();
        return;
    }

    var task = actions.Current();
    task.ContinueWith(t => errorHandler(t.Exception),
                      TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => RunSequential(onComplete, errorHandler, actions),
                      TaskContinuationOptions.OnlyOnRanToCompletion);
}

这可确保仅在前一个任务成功完成时才请求每个后续任务。
它假定Func&lt;Task&gt; 返回一个已经在运行的任务。

【讨论】:

  • 如果你明白我的意思,我想我终于找到了一些东西。感谢您的实施/问题!
【解决方案3】:

您在这里拥有的本质上是ForEachAsync。您希望按顺序运行每个异步项目,但需要一些错误处理支持。这是一个这样的实现:

public static Task ForEachAsync(IEnumerable<Func<Task>> tasks)
{
    var tcs = new TaskCompletionSource<bool>();

    Task currentTask = Task.FromResult(false);

    foreach (Func<Task> function in tasks)
    {
        currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
        currentTask.ContinueWith(t => tcs.TrySetCanceled()
                , TaskContinuationOptions.OnlyOnCanceled);
        Task<Task> continuation = currentTask.ContinueWith(t => function()
            , TaskContinuationOptions.OnlyOnRanToCompletion);
        currentTask = continuation.Unwrap();
    }

    currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
    currentTask.ContinueWith(t => tcs.TrySetCanceled()
            , TaskContinuationOptions.OnlyOnCanceled);
    currentTask.ContinueWith(t => tcs.TrySetResult(true)
            , TaskContinuationOptions.OnlyOnRanToCompletion);

    return tcs.Task;
}

我也添加了对取消任务的支持,只是为了更通用,因为它需要做的事情很少。

它将每个任务添加为前一个任务的延续,并且始终确保任何异常都会导致设置最终任务的异常。

这是一个示例用法:

public static Task FooAsync()
{
    Console.WriteLine("Started Foo");
    return Task.Delay(1000)
        .ContinueWith(t => Console.WriteLine("Finished Foo"));
}

public static Task BarAsync()
{
    return Task.Factory.StartNew(() => { throw new Exception(); });
}

private static void Main(string[] args)
{
    List<Func<Task>> list = new List<Func<Task>>();

    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => BarAsync());

    Task task = ForEachAsync(list);

    task.ContinueWith(t => Console.WriteLine(t.Exception.ToString())
        , TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => Console.WriteLine("Done!")
        , TaskContinuationOptions.OnlyOnRanToCompletion);
}

【讨论】:

  • 在我的术语中不会称这种异步任务的顺序执行。这是异步任务的顺序启动,但其中没有任何东西可以确保它们一个接一个地执行,这在我的脑海中意味着顺序执行。
  • @GudlaugurEgilsson 但这正是它的作用。每个都作为另一个的延续添加,所以第二个在第一个完成之前不能开始,第三个在第二个完成之前不能开始,等等。
  • 我的立场是正确的,再次尝试后,这完全符合我的预期。似乎问题出在我用来测试它的任务中。谢谢!附:将“function”重命名为“taskRunner”将极大地提高这段代码的可读性;-)
  • 在我的测试中加入这一点时,我发现这种将循环包装到延续任务中的方法会导致如果子任务抛出异常,聚合任务总是被取消。这可以通过将 foreach 循环中的倒数第二行更改为:Task&lt;Task&gt; continuation = currentTask.ContinueWith(t =&gt; { if (t.IsFaulted || t.IsCanceled) { return t; } return function(); }); 来纠正
  • @GudlaugurEgilsson 该过程的目标是确保在出现异常或取消时不会运行进一步的操作。我明确地不遗余力地做到这一点。如果您不想要这种行为,只需删除取消/错误的延续,并使触发下一个任务的延续成为无条件的。
【解决方案4】:

您应该能够创建一个方法来组合两个任务,并且只有在第一个成功时才启动第二个。

public static Task Then(this Task parent, Task next)
{
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    parent.ContinueWith(pt =>
    {
        if (pt.IsFaulted)
        {
            tcs.SetException(pt.Exception.InnerException);
        }
        else
        {
            next.ContinueWith(nt =>
            {
                if (nt.IsFaulted)
                {
                    tcs.SetException(nt.Exception.InnerException);
                }
                else { tcs.SetResult(null); }
            });
            next.Start();
        }
    });
    return tcs.Task;
}

然后您可以将任务链接在一起:

Task outer = FooAsync()
    .Then(BarAsync())
    .Then(FubarAsync());

outer.ContinueWith(t => {
    if(t.IsFaulted) {
        //handle exception
    }
});

如果您的任务立即开始,您可以将它们包装在 Func: 中:

public static Task Then(this Task parent, Func<Task> nextFunc)
{
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    parent.ContinueWith(pt =>
    {
        if (pt.IsFaulted)
        {
            tcs.SetException(pt.Exception.InnerException);
        }
        else
        {
            Task next = nextFunc();
            next.ContinueWith(nt =>
            {
                if (nt.IsFaulted)
                {
                    tcs.SetException(nt.Exception.InnerException);
                }
                else { tcs.SetResult(null); }
            });
        }
    });
    return tcs.Task;
}

【讨论】:

  • 感谢您的回答。就像 Chris Sinclair 的回答一样,您假设从方法返回的任务没有启动。对于大多数 API,这是不正确的。已经在方法内部启动了。因此,在您的代码中 - 以及在 Chris 中 - 任务并行运行。一旦第一个任务完成,整个事情就会出错,因为它会尝试启动一个已经运行或完成的任务。
  • @DanielHilgarth - 抱歉,我在发帖前没有阅读他的回答。显而易见的事情就是使用Func&lt;Task&gt;,然后你可以使用FooAsync().Then(BarAsync).Then(FubarAsync)
  • 没错。这也是我在回答中所做的。
【解决方案5】:

现在,我并没有真正使用过 TPL,所以这只是在黑暗中尝试。鉴于@Servy 提到的内容,也许这不会完全异步运行。但我想我会发布它,如果它方式偏离了标记,你可以投票否决我或者我可以删除它(或者我们可以修复需要修复的内容)

public void RunAsync(Action onComplete, Action<Exception> errorHandler, params Action[] actions)
{
    if (actions.Length == 0)
    {
        //what to do when no actions/tasks provided?
        onComplete();
        return;
    }

    List<Task> tasks = new List<Task>(actions.Length);
    foreach(var action in actions)
    {
        Task task = new Task(action);
        task.ContinueWith(t => errorHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
        tasks.Add(task);
    }

    //last task calls onComplete
    tasks[actions.Length - 1].ContinueWith(t => onComplete(), TaskContinuationOptions.OnlyOnRanToCompletion);

    //wire all tasks to execute the next one, except of course, the last task
    for (int i = 0; i <= actions.Length - 2; i++)
    {
        var nextTask = tasks[i + 1];
        tasks[i].ContinueWith(t => nextTask.Start(), TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    tasks[0].Start();
}

它的用法如下:

RunAsync(() => Console.WriteLine("All done"),
            ex => Console.WriteLine(ex),
            Foo,
            Bar,
            Fubar);

想法?否决票? :)

(我当然更喜欢 async/await)

编辑:根据您的 cmets 采取Func&lt;Task&gt;,这将是一个正确的实现吗?

public void RunAsync(Action onComplete, Action<Exception> errorHandler, params Func<Task>[] actions)
{
    if (actions.Length == 0)
    {
        //what to do when no actions/tasks provided?
        onComplete();
        return;
    }

    List<Task> tasks = new List<Task>(actions.Length);
    foreach (var action in actions)
    {
        Func<Task> nextActionFunc = action;
        Task task = new Task(() =>
        {
            var nextTask = nextActionFunc();
            nextTask.ContinueWith(t => errorHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
            nextTask.Start();
        });
        tasks.Add(task);
    }

    //last task calls onComplete
    tasks[actions.Length - 1].ContinueWith(t => onComplete(), TaskContinuationOptions.OnlyOnRanToCompletion);

    //wire all tasks to execute the next one, except of course, the last task
    for (int i = 0; i <= actions.Length - 2; i++)
    {
        var nextTask = tasks[i + 1];
        tasks[i].ContinueWith(t => nextTask.Start(), TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    tasks[0].Start();
}

【讨论】:

  • 这在我的场景中不起作用,因为我所拥有的只是返回已经运行任务的方法。因此,我无法在新任务中包含同步操作。因此,您的方法确实需要使用 params Func&lt;Task&gt;[] 并使用它。
  • @DanielHilgarth 如果该方法采用Task 而不是Action 怎么办?
  • 不会有帮助,因为这意味着在调用 RunAsync 时所有任务都已经在运行,这正是我想要的。
  • @DanielHilgarth 正如之前在 cmets 中所说,您应该接受 Func&lt;Task&gt;。该函数可以返回一个已经运行的任务,或者启动一个新任务并返回它。
  • @ChrisSinclair:不,不是。它仍然一次性迭代任务。似乎假设代表返回的任务尚未启动。在我的情况下——我猜在大多数情况下——情况并非如此。请参阅my answer 了解它对我的作用。您的解决方案将并行运行这些任务,并在第一个任务完成后立即出错,因为它尝试启动已在运行或已完成的任务。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-04-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多