【发布时间】:2012-03-23 21:19:57
【问题描述】:
在新的 async dotnet 4.5 库中是否有办法在 Task.WhenAll 方法上设置超时?我想获取几个来源,并在 5 秒后停止,并跳过未完成的来源。
【问题讨论】:
标签: c# asynchronous async-await task-parallel-library
在新的 async dotnet 4.5 库中是否有办法在 Task.WhenAll 方法上设置超时?我想获取几个来源,并在 5 秒后停止,并跳过未完成的来源。
【问题讨论】:
标签: c# asynchronous async-await task-parallel-library
您可以使用Task.WhenAny() 将生成的Task 与Task.Delay() 结合起来:
await Task.WhenAny(Task.WhenAll(tasks), Task.Delay(timeout));
如果您想在超时的情况下收获已完成的任务:
var completedResults =
tasks
.Where(t => t.Status == TaskStatus.RanToCompletion)
.Select(t => t.Result)
.ToList();
【讨论】:
我认为does exception handling right 的更清晰、更强大的选项是在每个任务上使用Task.WhenAny 和timeout task,检查所有已完成的任务并过滤掉超时任务,然后使用@987654328 @ 而不是 Task.Result 来收集所有结果。
这是一个完整的工作解决方案:
static async Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout).ContinueWith(_ => default(TResult));
var completedTasks =
(await Task.WhenAll(tasks.Select(task => Task.WhenAny(task, timeoutTask)))).
Where(task => task != timeoutTask);
return await Task.WhenAll(completedTasks);
}
【讨论】:
Task.WhenAll 在返回已完成任务的任务上执行(即Task.WhenAnys 的结果)。然后我用 where 子句过滤这些任务。最后我在这些任务上使用Task.WhenAll 来提取它们的实际结果。所有这些任务此时应该已经完成了。
TaskScheduler.Default 作为参数配置ContinueWith,以避免在当前可能处于活动状态的任何古怪的环境TaskScheduler.Current 上运行延续。例如UITaskScheduler,或LowPriorityTaskScheduler。
Current scheculer 是最好的选择,当所有 continuation 只是进行非常短的调用或只返回一个值时。没有理由承担将调用编组到另一个线程的成本。当线程已经处于活动状态时,它是 UI TaskScheduler 还是低优先级调度程序有什么关系?如果有的话,askContinuationOptions.ExecuteSynchronously 可以用来确保使用相同的线程来避免重新调度
查看 Microsoft Consuming the Task-based Asynchronous Pattern 中的“Early Bailout”和“Task.Delay”部分。
早期救助。由 t1 表示的操作可以分组为 WhenAny 与另一个任务 t2,我们可以等待 WhenAny 任务。 t2 可能表示超时、取消或其他一些信号 将导致 WhenAny 任务在 t1 完成之前完成。
【讨论】:
您所描述的似乎是一个非常普遍的需求,但是我在任何地方都找不到这样的例子。我搜索了很多......我终于创建了以下内容:
TimeSpan timeout = TimeSpan.FromSeconds(5.0);
Task<Task>[] tasksOfTasks =
{
Task.WhenAny(SomeTaskAsync("a"), Task.Delay(timeout)),
Task.WhenAny(SomeTaskAsync("b"), Task.Delay(timeout)),
Task.WhenAny(SomeTaskAsync("c"), Task.Delay(timeout))
};
Task[] completedTasks = await Task.WhenAll(tasksOfTasks);
List<MyResult> = completedTasks.OfType<Task<MyResult>>().Select(task => task.Result).ToList();
我假设这里有一个返回 Task
从 completedTasks 的成员中,只有 MyResult 类型的任务是我们自己的任务,它设法赶上了时间。 Task.Delay 返回不同的类型。 这需要在打字方面做出一些妥协,但仍然可以很好地工作并且非常简单。
(当然可以使用查询 + ToArray 动态构建数组)。
【讨论】:
除了超时,我还检查了取消,如果你正在构建一个 web 应用程序,这很有用。
public static async Task WhenAll(
IEnumerable<Task> tasks,
int millisecondsTimeOut,
CancellationToken cancellationToken)
{
using(Task timeoutTask = Task.Delay(millisecondsTimeOut))
using(Task cancellationMonitorTask = Task.Delay(-1, cancellationToken))
{
Task completedTask = await Task.WhenAny(
Task.WhenAll(tasks),
timeoutTask,
cancellationMonitorTask
);
if (completedTask == timeoutTask)
{
throw new TimeoutException();
}
if (completedTask == cancellationMonitorTask)
{
throw new OperationCanceledException();
}
await completedTask;
}
}
【讨论】:
IEnumerable<Task> tasks 是否应该继续运行,或者如果发生取消或超时,是否应该取消/停止(处置)它们。我松散地使用了“处置”一词。你不必打电话给Task.Dispose。
查看http://tutorials.csharp-online.net/Task_Combinators 中提出的自定义任务组合器
async static Task<TResult> WithTimeout<TResult>
(this Task<TResult> task, TimeSpan timeout)
{
Task winner = await (Task.WhenAny
(task, Task.Delay (timeout)));
if (winner != task) throw new TimeoutException();
return await task; // Unwrap result/re-throw
}
我还没试过。
【讨论】:
@i3arnon 答案的无效结果版本,以及 cmets 和更改第一个参数以使用扩展名 this。
我还有一个转发方法,使用TimeSpan.FromMilliseconds(millisecondsTimeout) 将超时指定为 int 以匹配其他 Task 方法。
public static async Task WhenAll(this IEnumerable<Task> tasks, TimeSpan timeout)
{
// Create a timeout task.
var timeoutTask = Task.Delay(timeout);
// Get the completed tasks made up of...
var completedTasks =
(
// ...all tasks specified
await Task.WhenAll(tasks
// Now finish when its task has finished or the timeout task finishes
.Select(task => Task.WhenAny(task, timeoutTask)))
)
// ...but not the timeout task
.Where(task => task != timeoutTask);
// And wait for the internal WhenAll to complete.
await Task.WhenAll(completedTasks);
}
【讨论】:
似乎您只需要带有 timeout 参数的 Task.WaitAll 重载 - 如果它返回 true,那么您知道它们都已完成 - 否则,您可以在 IsCompleted 上进行过滤。
if (Task.WaitAll(tasks, myTimeout) == false)
{
tasks = tasks.Where(t => t.IsCompleted);
}
...
【讨论】:
Task.WaitAll() 是阻塞的,所以如果可以避免的话,在 C# 5 中使用它不是一个好主意。
Tasks 或async 方法之间的关系并不是那么简单。其次,这有什么关系?
我找到了以下代码,可以满足我的需要:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Net.Http;
using System.Json;
using System.Threading;
namespace MyAsync
{
class Program
{
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
Console.WriteLine("Start Main");
List<Task<List<MyObject>>> listoftasks = new List<Task<List<MyObject>>>();
listoftasks.Add(GetGoogle(cts));
listoftasks.Add(GetTwitter(cts));
listoftasks.Add(GetSleep(cts));
listoftasks.Add(GetxSleep(cts));
List<MyObject>[] arrayofanswers = Task.WhenAll(listoftasks).Result;
List<MyObject> answer = new List<MyObject>();
foreach (List<MyObject> answers in arrayofanswers)
{
answer.AddRange(answers);
}
foreach (MyObject o in answer)
{
Console.WriteLine("{0} - {1}", o.name, o.origin);
}
Console.WriteLine("Press <Enter>");
Console.ReadLine();
}
static async Task<List<MyObject>> GetGoogle(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetGoogle");
List<MyObject> l = new List<MyObject>();
var client = new HttpClient();
Task<HttpResponseMessage> awaitable = client.GetAsync("http://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=broersa", cts.Token);
HttpResponseMessage res = await awaitable;
Console.WriteLine("After GetGoogle GetAsync");
dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
Console.WriteLine("After GetGoogle ReadAsStringAsync");
foreach (var r in data.responseData.results)
{
l.Add(new MyObject() { name = r.titleNoFormatting, origin = "google" });
}
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetTwitter(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetTwitter");
List<MyObject> l = new List<MyObject>();
var client = new HttpClient();
Task<HttpResponseMessage> awaitable = client.GetAsync("http://search.twitter.com/search.json?q=broersa&rpp=5&include_entities=true&result_type=mixed",cts.Token);
HttpResponseMessage res = await awaitable;
Console.WriteLine("After GetTwitter GetAsync");
dynamic data = JsonValue.Parse(res.Content.ReadAsStringAsync().Result);
Console.WriteLine("After GetTwitter ReadAsStringAsync");
foreach (var r in data.results)
{
l.Add(new MyObject() { name = r.text, origin = "twitter" });
}
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetSleep(CancellationTokenSource cts)
{
try
{
Console.WriteLine("Start GetSleep");
List<MyObject> l = new List<MyObject>();
await Task.Delay(5000,cts.Token);
l.Add(new MyObject() { name = "Slept well", origin = "sleep" });
return l;
}
catch (TaskCanceledException)
{
return new List<MyObject>();
}
}
static async Task<List<MyObject>> GetxSleep(CancellationTokenSource cts)
{
Console.WriteLine("Start GetxSleep");
List<MyObject> l = new List<MyObject>();
await Task.Delay(2000);
cts.Cancel();
l.Add(new MyObject() { name = "Slept short", origin = "xsleep" });
return l;
}
}
}
我的解释在我的博文中: http://blog.bekijkhet.com/2012/03/c-async-examples-whenall-whenany.html
【讨论】:
除了 svick 的回答之外,当我必须等待几项任务完成但在等待时必须处理其他事情时,以下内容对我有用:
Task[] TasksToWaitFor = //Your tasks
TimeSpan Timeout = TimeSpan.FromSeconds( 30 );
while( true )
{
await Task.WhenAny( Task.WhenAll( TasksToWaitFor ), Task.Delay( Timeout ) );
if( TasksToWaitFor.All( a => a.IsCompleted ) )
break;
//Do something else here
}
【讨论】:
您可以使用以下代码:
var timeoutTime = 10;
var tasksResult = await Task.WhenAll(
listOfTasks.Select(x => Task.WhenAny(
x, Task.Delay(TimeSpan.FromMinutes(timeoutTime)))
)
);
var succeededtasksResponses = tasksResult
.OfType<Task<MyResult>>()
.Select(task => task.Result);
if (succeededtasksResponses.Count() != listOfTasks.Count())
{
// Not all tasks were completed
// Throw error or do whatever you want
}
//You can use the succeededtasksResponses that contains the list of successful responses
它是如何工作的:
您需要在 timeoutTime 变量中输入完成所有任务的时间限制。所以基本上所有任务都会在你在 timeoutTime 中设置的最大时间内等待。当所有任务都返回结果时,不会发生超时,会设置tasksResult。
之后,我们只获得已完成的任务。未完成的任务将没有结果。
【讨论】:
我试图改进优秀的i3arnon's solution,以解决一些小问题,但我最终得到了一个完全不同的实现。我试图解决的两个问题是:
timeout 快得多。
如果在循环中调用 WhenAll 并且 timeout 很大,则泄漏活动的 Task.Delay 可能会导致不可忽略的内存泄漏量。除此之外,我还添加了一个 cancellationToken 参数、解释此方法的作用的 XML 文档以及参数验证。这里是:
/// <summary>
/// Returns a task that will complete when all of the tasks have completed,
/// or when the timeout has elapsed, or when the token is canceled, whatever
/// comes first. In case the tasks complete first, the task contains the
/// results/exceptions of all the tasks. In case the timeout elapsed first,
/// the task contains the results/exceptions of the completed tasks only.
/// In case the token is canceled first, the task is canceled. To determine
/// whether a timeout has occured, compare the number of the results with
/// the number of the tasks.
/// </summary>
public static Task<TResult[]> WhenAll<TResult>(
Task<TResult>[] tasks,
TimeSpan timeout, CancellationToken cancellationToken = default)
{
if (tasks == null) throw new ArgumentNullException(nameof(tasks));
if (tasks.Any(t => t == null)) throw new ArgumentException(
$"The {nameof(tasks)} argument included a null value.", nameof(tasks));
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeout));
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
var continuations = tasks.Select(task => task.ContinueWith(_ => { }, cts.Token,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default));
return Task.WhenAll(continuations).ContinueWith(whenAllContinuations =>
{
cts.Dispose();
if (whenAllContinuations.IsCompletedSuccessfully) return Task.WhenAll(tasks);
cancellationToken.ThrowIfCancellationRequested();
return Task.WhenAll(tasks.Where(task => task.IsCompleted));
}, TaskScheduler.Default).Unwrap();
}
这个WhenAll 实现elides async and await,一般不建议这样做。在这种情况下,有必要在not nested AggregateException 中传播所有错误。目的是尽可能准确地模拟内置 Task.WhenAll 方法的行为。
使用示例:
string[] results;
Task<string[]> whenAllTask = WhenAll(tasks, TimeSpan.FromSeconds(15));
try
{
results = await whenAllTask;
}
catch when (whenAllTask.IsFaulted) // It might also be canceled
{
// Log all errors
foreach (var innerEx in whenAllTask.Exception.InnerExceptions)
{
_logger.LogError(innerEx, innerEx.Message);
}
throw; // Propagate the error of the first failed task
}
if (results.Length < tasks.Length) throw new TimeoutException();
return results;
注意:上述 API 存在设计缺陷。如果至少有一项任务失败或被取消,则无法确定是否发生超时。 WhenAll返回的任务的Exception.InnerExceptions属性可能包含所有任务的异常,也可能是部分任务的异常,没有办法说哪个是哪个。不幸的是,我想不出解决这个问题的办法。
【讨论】: