【问题标题】:Asynchronous Task.WhenAll with timeout异步 Task.WhenAll 超时
【发布时间】:2012-03-23 21:19:57
【问题描述】:

在新的 async dotnet 4.5 库中是否有办法在 Task.WhenAll 方法上设置超时?我想获取几个来源,并在 5 秒后停止,并跳过未完成的来源。

【问题讨论】:

    标签: c# asynchronous async-await task-parallel-library


    【解决方案1】:

    您可以使用Task.WhenAny() 将生成的TaskTask.Delay() 结合起来:

    await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(timeout));
    

    如果您想在超时的情况下收获已完成的任务:

    var completedResults =
      tasks
      .Where(t => t.Status == TaskStatus.RanToCompletion)
      .Select(t => t.Result)
      .ToList();
    

    【讨论】:

    • 这获得了最多的支持,但我们知道这是否是实现这一目标的有效方法吗?
    • @CitadelCSAlum 你是什么意思?此代码执行所要求的操作。不信可以看文档或者自己试试。
    • 虽然这是公认的答案,但它是否完全符合问题中的描述?如果我理解正确,如果在所有任务完成之前发生超时,则不会收到任何结果(即使某些任务已完成)。我对么?我一直在寻找可以从多个任务中提取结果的东西——只接受那些超过超时的任务,不管其余任务是否未能这样做。请参阅下面的答案。
    • @ErezCohen 你是对的。我想我主要回答了问题的标题而不是正文(尤其是“跳过未完成的来源”部分)。
    • @James South 这两个 sn-ps 可以通过在第一个之后调用第二个来组合。首先等待,然后收集已完成任务的结果。有可能所有任务都将完成,或者只完成相同的任务,或者一个都不完成。
    【解决方案2】:

    我认为does exception handling right 的更清晰、更强大的选项是在每个任务上使用Task.WhenAnytimeout task,检查所有已完成的任务并过滤掉超时任务,然后使用@987654328 @ 而不是 Task.Result 来收集所有结果。

    这是一个完整的工作解决方案:

    static async Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks, TimeSpan timeout)
    {
        var timeoutTask = Task.Delay(timeout).ContinueWith(_ => default(TResult));
        var completedTasks = 
            (await Task.WhenAll(tasks.Select(task => Task.WhenAny(task, timeoutTask)))).
            Where(task => task != timeoutTask);
        return await Task.WhenAll(completedTasks);
    }
    

    【讨论】:

    • 有两个WhenAll,有没有性能问题?第二个 WhenAll 是拆箱 Task ?你能解释一下吗?
    • @MenelaosVergis 第一个Task.WhenAll 在返回已完成任务的任务上执行(即Task.WhenAnys 的结果)。然后我用 where 子句过滤这些任务。最后我在这些任务上使用Task.WhenAll 来提取它们的实际结果。所有这些任务此时应该已经完成​​了。
    • 我建议使用TaskScheduler.Default 作为参数配置ContinueWith,以避免在当前可能处于活动状态的任何古怪的环境TaskScheduler.Current 上运行延续。例如UITaskScheduler,或LowPriorityTaskScheduler
    • @TheodorZoulias 相反,Current scheculer 是最好的选择,当所有 continuation 只是进行非常短的调用或只返回一个值时。没有理由承担将调用编组到另一个线程的成本。当线程已经处于活动状态时,它是 UI TaskScheduler 还是低优先级调度程序有什么关系?如果有的话,askContinuationOptions.ExecuteSynchronously 可以用来确保使用相同的线程来避免重新调度
    • @TheodorZoulias 我认为您没有意识到您正在告诉一位创建性能指南的 Microsoft perf 工程师他的代码是错误的。我认为您误解了该指南并将其推向了极端,却不了解它为何适用以及何时适用。使用什么线程返回一个常量值有什么关系?至于您的解决方案,您是否考虑过这么多需要解包的打包任务的复杂性和成本?
    【解决方案3】:

    查看 Microsoft Consuming the Task-based Asynchronous Pattern 中的“Early Bailout”和“Task.Delay”部分。

    早期救助。由 t1 表示的操作可以分组为 WhenAny 与另一个任务 t2,我们可以等待 WhenAny 任务。 t2 可能表示超时、取消或其他一些信号 将导致 WhenAny 任务在 t1 完成之前完成。

    【讨论】:

    • 你想添加它所说的摘要吗?
    • 不知道你为什么回到这篇文章,但你的代码示例正是论文所描述的(我假设你很清楚)。应您的要求,我已用逐字引用更新了我的答案。
    • @DavidPeden 这个链接现在坏了,谷歌搜索已经找到了这篇文章,不确定这是否也是你所指的。 docs.microsoft.com/en-us/dotnet/standard/…
    • 谢谢。我已经更新了链接,这是您链接的同一根文档下的第三篇文章。
    【解决方案4】:

    您所描述的似乎是一个非常普遍的需求,但是我在任何地方都找不到这样的例子。我搜索了很多......我终于创建了以下内容:

    TimeSpan timeout = TimeSpan.FromSeconds(5.0);
    
    Task<Task>[] tasksOfTasks =
    {
        Task.WhenAny(SomeTaskAsync("a"), Task.Delay(timeout)),
        Task.WhenAny(SomeTaskAsync("b"), Task.Delay(timeout)),
        Task.WhenAny(SomeTaskAsync("c"), Task.Delay(timeout))
    };
    
    Task[] completedTasks = await Task.WhenAll(tasksOfTasks);
    
    List<MyResult> = completedTasks.OfType<Task<MyResult>>().Select(task => task.Result).ToList();
    

    我假设这里有一个返回 Task 的方法 SomeTaskAsync。

    从 completedTasks 的成员中,只有 MyResult 类型的任务是我们自己的任务,它设法赶上了时间。 Task.Delay 返回不同的类型。 这需要在打字方面做出一些妥协,但仍然可以很好地工作并且非常简单。

    (当然可以使用查询 + ToArray 动态构建数组)。

    • 请注意,此实现不需要 SomeTaskAsync 来接收取消令牌。

    【讨论】:

    • 这看起来应该被封装到一个辅助方法中。
    • @ErezCohen 如果您想看一下,我的回答会更加简单:stackoverflow.com/a/25733275/885318
    • @I3arnon - 不错!我喜欢它。
    【解决方案5】:

    除了超时,我还检查了取消,如果你正在构建一个 web 应用程序,这很有用。

    public static async Task WhenAll(
        IEnumerable<Task> tasks, 
        int millisecondsTimeOut,
        CancellationToken cancellationToken)
    {
        using(Task timeoutTask = Task.Delay(millisecondsTimeOut))
        using(Task cancellationMonitorTask = Task.Delay(-1, cancellationToken))
        {
            Task completedTask = await Task.WhenAny(
                Task.WhenAll(tasks), 
                timeoutTask, 
                cancellationMonitorTask
            );
    
            if (completedTask == timeoutTask)
            {
                throw new TimeoutException();
            }
            if (completedTask == cancellationMonitorTask)
            {
                throw new OperationCanceledException();
            }
            await completedTask;
        }
    }
    

    【讨论】:

    • 此代码存在问题:如果任何常规任务完成,则不会等待超时任务。因此,您的代码将运行直到超时任务被释放,如果在超时任务运行之前最后,你会得到一个 InvalidStateOperation。让任务保持不变,你就没事了。
    • @TheodorZoulias 您需要处理任务。如果发生超时或引发取消,则由调用者决定如何处理正在运行的任务。
    • Tony 在linked article Stephen Toub 中说“不。不要费心处理你的任务。”。你说“你需要处理任务。”我很困惑。我应该听从谁的建议?
    • @TheodorZoulias 对于造成的混乱,我深表歉意。我的意思是由调用者决定剩余的IEnumerable&lt;Task&gt; tasks 是否应该继续运行,或者如果发生取消或超时,是否应该取消/停止(处置)它们。我松散地使用了“处置”一词。你不必打电话给Task.Dispose
    【解决方案6】:

    查看http://tutorials.csharp-online.net/Task_Combinators 中提出的自定义任务组合器

    async static Task<TResult> WithTimeout<TResult> 
       (this Task<TResult> task, TimeSpan timeout)
     {
       Task winner = await (Task.WhenAny 
          (task, Task.Delay (timeout)));
       if (winner != task) throw new TimeoutException();
       return await task; // Unwrap result/re-throw
    }
    

    我还没试过。

    【讨论】:

    • a) 链接已损坏。 b)这适用于单个任务,这不是 OP 所要求的。
    【解决方案7】:

    @i3arnon 答案的无效结果版本,以及 cmets 和更改第一个参数以使用扩展名 this。

    我还有一个转发方法,使用TimeSpan.FromMilliseconds(millisecondsTimeout) 将超时指定为 int 以匹配其他 Task 方法。

    public static async Task WhenAll(this IEnumerable<Task> tasks, TimeSpan timeout)
    {
      // Create a timeout task.
      var timeoutTask = Task.Delay(timeout);
    
      // Get the completed tasks made up of...
      var completedTasks =
      (
        // ...all tasks specified
        await Task.WhenAll(tasks
    
        // Now finish when its task has finished or the timeout task finishes
        .Select(task => Task.WhenAny(task, timeoutTask)))
      )
      // ...but not the timeout task
      .Where(task => task != timeoutTask);
    
      // And wait for the internal WhenAll to complete.
      await Task.WhenAll(completedTasks);
    }
    

    【讨论】:

      【解决方案8】:

      似乎您只需要带有 timeout 参数的 Task.WaitAll 重载 - 如果它返回 true,那么您知道它们都已完成 - 否则,您可以在 IsCompleted 上进行过滤。

      if (Task.WaitAll(tasks, myTimeout) == false)
      {
          tasks = tasks.Where(t => t.IsCompleted);
      }
      ...
      

      【讨论】:

      • 我认为这些任务都是在自己的线程中开始的,而新的异步函数不是,但如果我错了,请纠正我。我刚刚开始这个新的异步内容。
      • Task.WaitAll() 是阻塞的,所以如果可以避免的话,在 C# 5 中使用它不是一个好主意。
      • @broersa 首先,我认为你弄错了,线程和Tasks 或async 方法之间的关系并不是那么简单。其次,这有什么关系?
      • @svick Blocking 是我想要的词。现在事情已经明朗了。
      【解决方案9】:

      我找到了以下代码,可以满足我的需要:

      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Threading.Tasks;
      using System.Net.Http;
      using System.Json;
      using System.Threading;
      
      namespace MyAsync
      {
          class Program
          {
              static void Main(string[] args)
              {
                  var cts = new CancellationTokenSource();
                  Console.WriteLine("Start Main");
                  List<Task<List<MyObject>>> listoftasks = new List<Task<List<MyObject>>>();
                  listoftasks.Add(GetGoogle(cts));
                  listoftasks.Add(GetTwitter(cts));
                  listoftasks.Add(GetSleep(cts));
                  listoftasks.Add(GetxSleep(cts));
      
                  List<MyObject>[] arrayofanswers = Task.WhenAll(listoftasks).Result;
                  List<MyObject> answer = new List<MyObject>();
                  foreach (List<MyObject> answers in arrayofanswers)
                  {
                      answer.AddRange(answers);
                  }
                  foreach (MyObject o in answer)
                  {
                      Console.WriteLine("{0} - {1}", o.name, o.origin);
                  }
                  Console.WriteLine("Press <Enter>");
                  Console.ReadLine();
              } 
      
              static async Task<List<MyObject>> GetGoogle(CancellationTokenSource cts) 
              {
                  try
                  {
                      Console.WriteLine("Start GetGoogle");
                      List<MyObject> l = new List<MyObject>();
                      var client = new HttpClient();
                      Task<HttpResponseMessage> awaitable = client.GetAsync("http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=broersa", cts.Token);
                      HttpResponseMessage res = await awaitable;
                      Console.WriteLine("After GetGoogle GetAsync");
                      dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
                      Console.WriteLine("After GetGoogle ReadAsStringAsync");
                      foreach (var r in data.responseData.results)
                      {
                          l.Add(new MyObject() { name = r.titleNoFormatting, origin = "google" });
                      }
                      return l;
                  }
                  catch (TaskCanceledException)
                  {
                      return new List<MyObject>();
                  }
              }
      
              static async Task<List<MyObject>> GetTwitter(CancellationTokenSource cts)
              {
                  try
                  {
                      Console.WriteLine("Start GetTwitter");
                      List<MyObject> l = new List<MyObject>();
                      var client = new HttpClient();
                      Task<HttpResponseMessage> awaitable = client.GetAsync("http://search.twitter.com/search.json?q=broersa&rpp=5&include_entities=true&result_type=mixed",cts.Token);
                      HttpResponseMessage res = await awaitable;
                      Console.WriteLine("After GetTwitter GetAsync");
                      dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
                      Console.WriteLine("After GetTwitter ReadAsStringAsync");
                      foreach (var r in data.results)
                      {
                          l.Add(new MyObject() { name = r.text, origin = "twitter" });
                      }
                      return l;
                  }
                  catch (TaskCanceledException)
                  {
                      return new List<MyObject>();
                  }
              }
      
              static async Task<List<MyObject>> GetSleep(CancellationTokenSource cts)
              {
                  try
                  {
                      Console.WriteLine("Start GetSleep");
                      List<MyObject> l = new List<MyObject>();
                      await Task.Delay(5000,cts.Token);
                      l.Add(new MyObject() { name = "Slept well", origin = "sleep" });
                      return l;
                  }
                  catch (TaskCanceledException)
                  {
                      return new List<MyObject>();
                  }
      
              } 
      
              static async Task<List<MyObject>> GetxSleep(CancellationTokenSource cts)
              {
                  Console.WriteLine("Start GetxSleep");
                  List<MyObject> l = new List<MyObject>();
                  await Task.Delay(2000);
                  cts.Cancel();
                  l.Add(new MyObject() { name = "Slept short", origin = "xsleep" });
                  return l;
              } 
      
          }
      }
      

      我的解释在我的博文中: http://blog.bekijkhet.com/2012/03/c-async-examples-whenall-whenany.html

      【讨论】:

        【解决方案10】:

        除了 svick 的回答之外,当我必须等待几项任务完成但在等待时必须处理其他事情时,以下内容对我有用:

        Task[] TasksToWaitFor = //Your tasks
        TimeSpan Timeout = TimeSpan.FromSeconds( 30 );
        
        while( true )
        {
            await Task.WhenAny( Task.WhenAll( TasksToWaitFor ), Task.Delay( Timeout ) );
            if( TasksToWaitFor.All( a => a.IsCompleted ) )
                break;
        
            //Do something else here
        }
        

        【讨论】:

          【解决方案11】:

          您可以使用以下代码:

                  var timeoutTime = 10;
          
                  var tasksResult = await Task.WhenAll(
                                          listOfTasks.Select(x => Task.WhenAny(
                                              x, Task.Delay(TimeSpan.FromMinutes(timeoutTime)))
                                          )
                                      );
          
          
                  var succeededtasksResponses = tasksResult
                                                         .OfType<Task<MyResult>>()
                                                         .Select(task => task.Result);
          
                  if (succeededtasksResponses.Count() != listOfTasks.Count())
                  {
                      // Not all tasks were completed
                      // Throw error or do whatever you want
                  }
          
                  //You can use the succeededtasksResponses that contains the list of successful responses
          

          它是如何工作的:

          您需要在 timeoutTime 变量中输入完成所有任务的时间限制。所以基本上所有任务都会在你在 timeoutTime 中设置的最大时间内等待。当所有任务都返回结果时,不会发生超时,会设置tasksResult。

          之后,我们只获得已完成的任务。未完成的任务将没有结果。

          【讨论】:

            【解决方案12】:

            我试图改进优秀的i3arnon's solution,以解决一些小问题,但我最终得到了一个完全不同的实现。我试图解决的两个问题是:

            1. 如果有多个任务失败,propagate the errors of all failed tasks,而不仅仅是列表中第一个失败任务的错误。
            2. 防止内存泄漏,以防所有任务完成的速度比timeout 快得多。 如果在循环中调用 WhenAll 并且 timeout 很大,则泄漏活动的 Task.Delay 可能会导致不可忽略的内存泄漏量。

            除此之外,我还添加了一个 cancellationToken 参数、解释此方法的作用的 XML 文档以及参数验证。这里是:

            /// <summary>
            /// Returns a task that will complete when all of the tasks have completed,
            /// or when the timeout has elapsed, or when the token is canceled, whatever
            /// comes first. In case the tasks complete first, the task contains the
            /// results/exceptions of all the tasks. In case the timeout elapsed first,
            /// the task contains the results/exceptions of the completed tasks only.
            /// In case the token is canceled first, the task is canceled. To determine
            /// whether a timeout has occured, compare the number of the results with
            /// the number of the tasks.
            /// </summary>
            public static Task<TResult[]> WhenAll<TResult>(
                Task<TResult>[] tasks,
                TimeSpan timeout, CancellationToken cancellationToken = default)
            {
                if (tasks == null) throw new ArgumentNullException(nameof(tasks));
                if (tasks.Any(t => t == null)) throw new ArgumentException(
                    $"The {nameof(tasks)} argument included a null value.", nameof(tasks));
                if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
                    throw new ArgumentOutOfRangeException(nameof(timeout));
            
                var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
                cts.CancelAfter(timeout);
            
                var continuations = tasks.Select(task => task.ContinueWith(_ => { }, cts.Token,
                    TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
            
                return Task.WhenAll(continuations).ContinueWith(whenAllContinuations =>
                {
                    cts.Dispose();
                    if (whenAllContinuations.IsCompletedSuccessfully) return Task.WhenAll(tasks);
                    cancellationToken.ThrowIfCancellationRequested();
                    return Task.WhenAll(tasks.Where(task => task.IsCompleted));
                }, TaskScheduler.Default).Unwrap();
            }
            

            这个WhenAll 实现elides async and await,一般不建议这样做。在这种情况下,有必要在not nested AggregateException 中传播所有错误。目的是尽可能准确地模拟内置 Task.WhenAll 方法的行为。

            使用示例:

            string[] results;
            Task<string[]> whenAllTask = WhenAll(tasks, TimeSpan.FromSeconds(15));
            try
            {
                results = await whenAllTask;
            }
            catch when (whenAllTask.IsFaulted) // It might also be canceled
            {
                // Log all errors
                foreach (var innerEx in whenAllTask.Exception.InnerExceptions)
                {
                    _logger.LogError(innerEx, innerEx.Message);
                }
                throw; // Propagate the error of the first failed task
            }
            if (results.Length < tasks.Length) throw new TimeoutException();
            return results;
            

            注意:上述 API 存在设计缺陷。如果至少有一项任务失败或被取消,则无法确定是否发生超时。 WhenAll返回的任务的Exception.InnerExceptions属性可能包含所有任务的异常,也可能是部分任务的异常,没有办法说哪个是哪个。不幸的是,我想不出解决这个问题的办法。

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2018-08-26
              • 1970-01-01
              • 2020-10-25
              • 2016-09-04
              相关资源
              最近更新 更多