【问题标题】:Unexpected behavior with await inside a ContinueWith block在 ContinueWith 块内等待的意外行为
【发布时间】:2023-04-04 07:32:01
【问题描述】:

我有一个稍微复杂的要求,即并行执行一些任务,并且必须等待其中一些任务完成才能继续。现在,我遇到了意外行为,当我有许多任务时,我想并行执行,但在 ContinueWith 处理程序中。我准备了一个小样本来说明这个问题:

var task1 = Task.Factory.StartNew(() =>
{
    Console.WriteLine("11");
    Thread.Sleep(1000);
    Console.WriteLine("12");
}).ContinueWith(async t =>
{
    Console.WriteLine("13");
    var innerTasks = new List<Task>();
    for (var i = 0; i < 10; i++)
    {
        var j = i;
        innerTasks.Add(Task.Factory.StartNew(() =>
        {
            Console.WriteLine("1_" + j + "_1");
            Thread.Sleep(500);
            Console.WriteLine("1_" + j + "_2");
        }));
    }
    await Task.WhenAll(innerTasks.ToArray());
    //Task.WaitAll(innerTasks.ToArray());
    Thread.Sleep(1000);
    Console.WriteLine("14");
});
var task2 = Task.Factory.StartNew(() =>
{
    Console.WriteLine("21");
    Thread.Sleep(1000);
    Console.WriteLine("22");
}).ContinueWith(t =>
{
    Console.WriteLine("23");
    Thread.Sleep(1000);
    Console.WriteLine("24");
});
Console.WriteLine("1");
await Task.WhenAll(task1, task2);
Console.WriteLine("2");

基本模式是: - 任务 1 应与任务 2 并行执行。 - 一旦第 1 部分的第一部分完成,它应该同时做更多的事情。我想完成,一旦一切都完成了。

我希望得到以下结果:

1 <- Start
11 / 21 <- The initial task start
12 / 22 <- The initial task end
13 / 23 <- The continuation task start
Some combinations of "1_[0..9]_[1..2]" and 24 <- the "inner" tasks of task 1 + the continuation of task 2 end
14 <- The end of the task 1 continuation
2 <- The end

相反,await Task.WhenAll(innerTasks.ToArray()); 不会“阻止”继续任务完成。因此,内部任务在外部await Task.WhenAll(task1, task2); 完成后执行。结果是这样的:

1 <- Start
11 / 21 <- The initial task start
12 / 22 <- The initial task end
13 / 23 <- The continuation task start
Some combinations of "1_[0..9]_[1..2]" and 24 <- the "inner" tasks of task 1 + the continuation of task 2 end
2 <- The end
Some more combinations of "1_[0..9]_[1..2]" <- the "inner" tasks of task 1
14 <- The end of the task 1 continuation

如果我改为使用Task.WaitAll(innerTasks.ToArray()),一切似乎都按预期工作。当然,我不想使用 WaitAll,所以我不会阻塞任何线程。

我的问题是:

  1. 为什么会出现这种意外行为?
  2. 如何在不阻塞任何线程的情况下补救这种情况?

非常感谢您的任何指点!

【问题讨论】:

  • 对问题 1 的回答是因为 ContinueWith 异步运行其内容,并且没有任何东西在等待它们;您正在等待 task1,其中 continueWith 不是其中的一部分。
  • 看起来您只想将“continueWith”部分内联到 task1(而不是 continueWith)以实现所需的结果。这就是你所说的。在任务 1 中执行这些步骤,然后执行这些其他任务。这不需要 continueWith。
  • 内联并不是一个真正的选择,因为第一个任务已经是异步的,需要在继续运行之前完成。当然,我可以使用包装任务,在其中,只需等待第一个操作,然后启动其他操作。但是,我将有两项任务,我认为一项就足够了。另一方面,ContinueWith 似乎就是通过返回Task&lt;Task&lt;T&gt;&gt; 来做到这一点的,所以我想这无关紧要,额外的任务来自哪里。如果我错了,请纠正我。

标签: c# multithreading asynchronous async-await


【解决方案1】:

您使用了错误的工具。代替StartNew,使用Task.Run。代替ContinueWith,使用await

var task1 = Task1();
var task2 = Task2();
Console.WriteLine("1");
await Task.WhenAll(task1, task2);
Console.WriteLine("2");

private async Task Task1()
{
  await Task.Run(() =>
  {
    Console.WriteLine("11");
    Thread.Sleep(1000);
    Console.WriteLine("12");
  });
  Console.WriteLine("13");
  var innerTasks = new List<Task>();
  for (var i = 0; i < 10; i++)
  {
    innerTasks.Add(Task.Run(() =>
    {
      Console.WriteLine("1_" + i + "_1");
      Thread.Sleep(500);
      Console.WriteLine("1_" + i + "_2");
    }));
    await Task.WhenAll(innerTasks);
  }
  Thread.Sleep(1000);
  Console.WriteLine("14");
}

private async Task Task2()
{
  await Task.Run(() =>
  {
    Console.WriteLine("21");
    Thread.Sleep(1000);
    Console.WriteLine("22");
  });
  Console.WriteLine("23");
  Thread.Sleep(1000);
  Console.WriteLine("24");
}

Task.Runawait 在这里更胜一筹,因为它们纠正了StartNew/ContinueWith 中的许多意外行为。特别是异步委托和(对于Task.Run)总是使用线程池。

我的博客上有更多关于why you shouldn't use StartNewwhy you shouldn't use ContinueWith 的详细信息。

【讨论】:

  • 谢谢你。我认为,它使代码更加简洁。关于Task.Run(..)Task.Factory.StartNew,我已经阅读了各种意见,我们甚至在使用Task.Run(..)时遇到了问题,在某些情况下切换到.StartNew(..),但是,我现在找不到解释。除了对代码进行一些重新排序(await Task.WhenAll(innerTasks); 应该在 for 循环之外),以及 i 的修改关闭问题之外,您的代码就像一个魅力。我错过了异步方法中的 await 会产生并将控制权返回给调用方法。
  • 我刚刚注意到你的代码只是重复了我的原始代码也有同样的错误,await Task.WhenAll(innerTasks); 在错误的位置,并且访问了修改后的闭包。我已经更新了我的代码以反映这一点。再次感谢,您的回答帮了大忙。
【解决方案2】:

in the comments 所述,您所看到的是正常的。当传递给ContinueWith() 并由ContinueWith() 调用的委托完成执行时,ContinueWith() 返回的Task 完成。这在匿名方法第一次使用await 语句时发生,并且委托返回一个Task 对象本身,它表示整个匿名方法的最终完成。

由于您只在等待ContinueWith() 任务,并且此任务仅代表代表匿名方法的任务的可用性,而不是该任务的完成,因此您的代码不会等等。

从您的示例中,尚不清楚最佳解决方案是什么。但如果你做这个小改动,它会做你想做的:

await Task.WhenAll(await task1, task2);

即在 WhenAll() 调用中,不要等待 ContinueWith() 任务本身,而是等待任务 that 最终将返回。在此处使用await 以避免在等待该任务可用时阻塞线程。

【讨论】:

  • 谢谢,我没有意识到.ContinueWith() 返回一个Task>。我想,它只是级联,结果任务将是链中的“最后一个”任务。但是,我当时不太明白的是,为什么使用 Task.WaitAll(...) 而不是 await Task.WhenAll(...) 也会产生预期的结果。我想,这应该没什么区别,因为这个“等待”也在发生,当原始任务已经完成时。如果你能解开这个谜,我很乐意接受你的回答。
  • “为什么使用 Task.WaitAll(...)... 也会产生预期的结果” - 您的帖子仅提及使用 WaitAll(innerTasks.ToArray())。这样做可以防止错误发生的原因是这样做会删除您的匿名方法中导致它在方法完成之前返回的await 语句。如果您只是WaitAll(),则该方法会阻塞线程,直到整个方法完成(包括对WaitAll() 的调用),而不是返回,稍后在await 表达式处继续。
  • 嗯,我想,我仍然不太了解这里涉及的概念。从您到目前为止所写的内容来看,我认为 Task 上的 .WhenAll() 会等待原始任务完成然后完成。但是,现在您说的是,在外部任务被标记为已完成之前,内部任务已被执行。如果我不在延续中使用await,它只会在整个延续运行后将外部任务标记为已完成。但是,如果我这样做了,一旦内部任务开始等待另一个异步结果,它会将外部任务标记为已完成?
  • 您将“方法已完成”与“方法已返回”混淆了。理解这种区别非常很重要。就像 C# 迭代器方法可以在它完成之前返回(即通过yield return)一样,async 方法也可以通过await . async 方法不返回其结果;它返回一个代表该结果的 task。当方法从其初始调用者返回时,该任务可能不会(通常不会)完成。稍后,该方法将完成,之前返回的任务的观察者可以看到它何时发生。
  • 非常感谢您解释收益与回报部分。我还没有完全理解这一点。这也解释了为什么 @stephen-cleary 的解决方案会以它的方式工作。我原以为,调用方法也会在继续 task2 初始化之前等待,但这清楚地解释了这种行为。
【解决方案3】:

当使用带有StartNew 的异步方法/lambdas 时,您可以等待返回的任务包含的任务:

var task = Task.Factory.StartNew(async () => { /* ... */ });
task.Wait();
task.Result.Wait();
// consume task.Result.Result

或者你对StartNew的结果使用扩展方法Unwrap并等待它返回的任务。

var task = Task.Factory.StartNew(async () => { /* ... */ })
    .Unwrap();
task.Wait();
// consume task.Result

以下讨论的思路是,在特定情况下应避免使用 Task.Factory.StartNewContinueWith,例如当您不提供创建或继续选项或不提供任务调度程序时。

我不同意不应使用Task.Factory.StartNew,我同意您应该使用(或考虑使用)Task.Run,无论您使用不采用TaskCreationOptionsTask.Factory.StartNew 方法重载还是TaskScheduler

请注意,这只适用于默认的Task.Factory。我使用了自定义任务工厂,我选择使用不带选项和任务调度程序的 StartNew 重载,因为我根据需要配置了工厂特定的默认值。

同样,我不同意不应使用 ContinueWith,我同意您应该使用(或考虑使用)async/await,无论您在哪里使用 ContinueWith 方法重载不要使用TaskContinuationOptionsTaskScheduler

例如,直到 C# 5,解决 awaitcatchfinally 块中不受支持的限制的最实用方法是使用 ContinueWith

C# 6:

try
{
    return await something;
}
catch (SpecificException ex)
{
    await somethingElse;
    // throw;
}
finally
{
    await cleanup;
}

在 C# 6 之前等效:

return await something
    .ContinueWith(async somethingTask =>
    {
        var ex = somethingTask.Exception.InnerException as SpecificException;
        if (ex != null)
        {
            await somethingElse;
            // await somethingTask;
        }
    },
        CancellationToken.None,
        TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.NotOnRanToCompletion,
        TaskScheduler.Default)
    .Unwrap()
    .ContinueWith(async catchTask =>
    {
        await cleanup;
        await catchTask;
    },
        CancellationToken.None,
        TaskContinuationOptions.DenyChildAttach,
        TaskScheduler.Default)
    .Unwrap();

因为,正如我所说,在某些情况下,我有一个具有特定默认值的 TaskFactory,所以我定义了一些采用 TaskFactory 的扩展方法,从而减少了不传递其中一个参数的错误机会(我知道我总是会忘记通过工厂本身):

public static Task ContinueWhen(this TaskFactory taskFactory, Task task, Action<Task> continuationAction)
{
    return task.ContinueWith(continuationAction, taskFactory.CancellationToken, taskFactory.ContinuationOptions, taskFactory.Scheduler);
}

public static Task<TResult> ContinueWhen<TResult>(this TaskFactory taskFactory, Task task, Func<Task, TResult> continuationFunction)
{
    return task.ContinueWith(continuationFunction, taskFactory.CancellationToken, taskFactory.ContinuationOptions, taskFactory.Scheduler);
}

// Repeat with argument combinations:
// - Task<TResult> task (instead of non-generic Task task)
// - object state
// - bool notOnRanToCompletion (useful in C# before 6)

用法:

// using namespace that contains static task extensions class
var task = taskFactory.ContinueWhen(existsingTask, t => Continue(a, b, c));
var asyncTask = taskFactory.ContinueWhen(existingTask, async t => await ContinueAsync(a, b, c))
    .Unwrap();

我决定不模仿Task.Run,不重载相同的方法名称来解包返回任务的委托,这并不总是你想要的。实际上,我什至没有实现ContinueWhenAsync 扩展方法,所以你需要使用Unwrap 或两个awaits。

通常,这些延续是 I/O 异步操作,并且预处理和后处理开销应该很小,以至于您不必关心它是否开始同步运行直到第一个让步点,或者即使它完成同步(例如,使用底层MemoryStream 或模拟数据库访问)。此外,它们中的大多数不依赖于同步上下文。

每当您应用Unwrap 扩展方法或两个awaits 时,您应该检查任务是否属于此类别。如果是这样,async/await 很可能是比开始任务更好的选择。

对于具有不可忽略的同步开销的异步操作,启动一个新任务可能更可取。即便如此,如果您的代码从一开始就是异步的,例如 async 由框架或主机调用的方法(ASP.NET、WCF、NServiceBus 6+ 等),因为 开销 是您的实际业务。对于长时间处理,您可以考虑谨慎使用Task.Yield。异步代码的一个原则是不要太细粒度,但是,太粗粒度同样糟糕:一组繁重的任务可能会阻止处理排队的轻量级任务。

如果异步操作依赖于同步上下文,你仍然可以使用async/await,如果你在那个上下文中(在这种情况下,在使用.ConfigureAwait(false)之前请三思而后行),否则,开始使用来自相应同步上下文的任务调度程序的新任务。

【讨论】:

  • 非常感谢您的详细解释。这确实有助于理解所涉及的所有概念。但是,我会将@stephen-cleary 的答案保留为正确答案,因为它解决了我的问题并使代码简洁明了。
猜你喜欢
  • 1970-01-01
  • 2013-09-29
  • 2013-12-25
  • 2014-02-21
  • 1970-01-01
  • 2018-07-08
  • 2018-08-04
  • 1970-01-01
  • 2021-12-16
相关资源
最近更新 更多