【问题标题】:Correct pattern to check if an async Task completed synchronously before awaiting it正确的模式来检查异步任务是否在等待之前同步完成
【发布时间】:2022-02-02 01:08:50
【问题描述】:

我有一堆请求要处理,其中一些可能会同步完成。 我想收集所有立即可用的结果并尽早返回,同时等待其余结果。

大概是这样的:

List<Task<Result>> tasks = new ();
List<Result> results = new ();

foreach (var request in myRequests) {
  var task = request.ProcessAsync();
  if (task.IsCompleted)
    results.Add(task.Result);  // or  Add(await task)  ?
  else 
    tasks.Add(task);
}

// send results that are available "immediately" while waiting for the rest
if (results.Count > 0)  SendResults(results);

results = await Task.WhenAll(tasks);
SendResults(results);

我不确定依赖IsCompleted 是否是个坏主意;是否会出现无法信任其结果的情况,或者可能再次变回false等情况?

同样,即使在检查了IsCompleted 之后,使用task.Result 会不会很危险,应该总是更喜欢await task 吗?如果使用ValueTask 而不是Task 会怎样?

【问题讨论】:

  • 用一种方法(SendResults)处理多个结果重要吗?您可以等待结果并单独发送每个..
  • 在这个虚构的例子中,一次处理多个结果并不重要@alexm。
  • 单独等待每个结果意味着依次等待每个请求。如果每个请求需要一秒钟并且有 10 个,则用户将不得不等待 10 秒钟。启动所有 10 个并等待所有这些将需要大约 1 秒。我的目标是更快地返回一些结果,例如因为它们是立即可用的缓存结果(例如,100 毫秒后发送 3 个结果,约 1 秒后发送 7 个结果)
  • 你在寻找类似Jon Skeet's OrderByCompletion method的东西吗?
  • 作为旁注,在您的示例中,如果一个任务失败,该任务的异常将立即传播,其余任务将继续运行,无需观察到即发即弃时尚。这是你的意图吗?

标签: c# async-await


【解决方案1】:

我不确定依赖 IsCompleted 是否是个坏主意;会不会有其结果不可信的情况……

如果您处于多线程上下文中,则 IsCompleted 可能会在您检查它时返回 false,但此后它会立即完成。在您使用的代码这样的情况下,发生这种情况的成本会非常低,所以我不会担心。

或者它可能会再次变回false等?

不,任务一旦完成,就不能不完成。

即使在检查了IsCompleted 之后,使用task.Result 会不会很危险。

不,那应该总是安全的。

应该总是更喜欢await task吗?

await 在您没有特定理由去做其他事情时是一个很好的默认值,但在许多用例中其他模式可能有用。您突出显示的用例就是一个很好的例子,您希望在不等待所有任务的情况下返回已完成任务的结果。

正如 Stephen Cleary 在下面的评论中提到的,使用await 来维持预期的异常行为可能仍然值得。你可能会考虑做更多这样的事情:

var requestsByIsCompleted = myRequests.ToLookup(r => r.IsCompleted);

// send results that are available "immediately" while waiting for the rest
SendResults(await Task.WhenAll(requestsByIsCompleted[true]));
SendResults(await Task.WhenAll(requestsByIsCompleted[false]));

如果使用ValueTask 而不是Task 会怎样?

以上答案同样适用于这两种类型。

【讨论】:

  • 即使IsCompleted 会返回false,即使它刚刚变成true,这也只是意味着任务是稍后处理而不是立即处理的。 IE。不是错误,只是延迟,可能是性能问题。不过,在我的情况下不是。感谢您的回答!
  • 不推荐使用 Result,因为它将异常包装在 AggregateException 中。首选awaitIsCompleted 时总是同步运行)或GetAwaiter().GetResult()
  • @StephenCleary 好点。我更新了我的答案。
【解决方案2】:

您可以使用这样的代码不断发送已完成任务的结果,同时等待其他人完成。

foreach (var request in myRequests)
{
    tasks.Add(request.ProcessAsync());
}

// wait for at least one task to be complete, then send all available results
while (tasks.Count > 0)
{
    // wait for at least one task to complete
    Task.WaitAny(tasks.ToArray());

    // send results for each completed task
    var completedTasks = tasks.Where(t => t.IsCompleted);
    var results = completedTasks.Where(t => t.IsCompletedSuccessfully).Select(t => t.Result).ToList();
    SendResults(results);

    // TODO: handle completed but failed tasks here

    // remove completed tasks from the tasks list and keep waiting
    tasks.RemoveAll(t => completedTasks.Contains(t));
}

【讨论】:

  • 感谢您的回答,约翰!但是,您也依赖其中的IsCompletedResult,而我的问题的重点是这样做是否安全。如果是这种情况,那么是的,您的答案将按照您的描述进行。我的实际场景不是我提出的问题,我只是做了一个简化的例子来演示它。
  • @StriplingWarrior 的回答很到位:两者都是安全的。在我写完演示代码后,我看到他们已经发布了一个很好的答案,但我还是想分享代码以防万一。
【解决方案3】:

仅使用await 即可实现所需的行为:


async Task ProcessAsync(MyRequest request, Sender sender)
{
     var result = await request.ProcessAsync();
     await sender.SendAsync(result);   
}

...

async Task ProcessAll()
{

   var tasks = new List<Task>();
   foreach(var request in requests) 
   {
      var task = ProcessAsync(request, sender);
      // Dont await until all requests are queued up
      tasks.Add(task);
   }
   // Await on all outstanding requests 
   await Task.WhenAll(tasks);

}



【讨论】:

  • 确实如此,但我的实际情况不同。有问题的代码只是一个例子来解释我在问什么
【解决方案4】:

已经有很好的答案,但除此之外还有我的建议,关于如何处理多个任务并以不同方式处理每个任务,也许它会满足您的需求。我的示例是事件,但您可以将它们替换为适合您需要的某种状态管理。

    public interface IRequestHandler
    {
        event Func<object, Task> Ready;
        Task ProcessAsync();
    }

    public class RequestHandler : IRequestHandler
    {
        // Hier where you wraps your request:
        // private object request;
        private readonly int value;

        public RequestHandler(int value)
            => this.value = value;

        public event Func<object, Task> Ready;

        public async Task ProcessAsync()
        {
            await Task.Delay(1000 * this.value);
            // Hier where you calls:
            // var result = await request.ProcessAsync();
            //... then do something over the result or wrap the call in try catch for example
            var result = $"RequestHandler {this.value} - [{DateTime.Now.ToLongTimeString()}]";

            if (this.Ready is not null)
            {
                // If result passes send the result to all subscribers

                await this.Ready.Invoke($"RequestHandler {this.value} - [{DateTime.Now.ToLongTimeString()}]");
            }
        }
    }

    static void Main()
    {
        var a = new RequestHandler(1);
        a.Ready += PrintAsync; 
        var b = new RequestHandler(2);
        b.Ready += PrintAsync;
        var c = new RequestHandler(3);
        c.Ready += PrintAsync;
        var d= new RequestHandler(4);
        d.Ready += PrintAsync;
        var e = new RequestHandler(5);
        e.Ready += PrintAsync;
        var f = new RequestHandler(6);
        f.Ready += PrintAsync;

        var requests = new List<IRequestHandler>()
        {
            a, b, c, d, e, f
        };

        var tasks = requests
            .Select(x => Task.Run(x.ProcessAsync));

        // Hier you must await all of the tasks
        Task
            .Run(async () => await Task.WhenAll(tasks))
            .Wait();
    }

    static Task PrintAsync(object output)
    {
        Console.WriteLine(output);

        return Task.CompletedTask;
    }

【讨论】:

    最近更新 更多