【问题标题】:Catching exceptions immediately does not work with Task.WhenAll立即捕获异常不适用于 Task.WhenAll
【发布时间】:2020-09-24 19:35:43
【问题描述】:

我有一个类的两个实例,它们创建一个 UDP 套接字来接收来自 UDP 客户端的数据。如果其中一个实例抛出异常,我想立即在更高层处理它。在我的程序中,它们以await Task.WhenAll(recv1.StartAsync(), recv2.StartAsync) 开头。但是,这会在引发第一个异常之前等待所有任务完成。有关如何解决此问题的任何想法?

static async Task Main(string[] args)
{
  var udpReceiver1 = new UdpReceiver(localEndpoint1);
  var udpReceiver2 = new UdpReceiver(localEndpoint2);

  var cts = new CancellationTokenSource();

  try
  {
    await Task.WhenAll(udpReceiver1.StartAsync(cts.Token), udpReceiver2.StartAsync(cts.Token));
  }
  catch (Exception e)
  {
    // Handle Exception...

    cts.Cancel();
  }
}

class UdpReceiver
{
  public UdpReceiver(IPEndPoint endpoint)
  {
    udpClient = new UdpClient(endpoint);
  }

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    try
    {
      while (!cancellationToken.IsCancellationRequested)
      {
        var result = await ReceiveAsync(cancellationToken);
        var message = Encoding.UTF8.GetString(result.Buffer);
        Trace.WriteLine($"UdpClient1 received message:{Encoding.UTF8.GetString(result.Buffer)}");
      
        // throw new Exception("UdpClient1 raising exception");
      }
    }
  }

  private async Task<UdpReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
  {
    var tcs = new TaskCompletionSource<UdpReceiveResult>();
    using (cancellationToken.Register(() => tcs.TrySetCanceled(), false))
    {
      var task = udpClient.ReceiveAsync();

      var completedTask = await Task.WhenAny(task, tcs.Task);

      var result = await completedTask.ConfigureAwait(false);

      return result;
    }
  }

  private UdpClient udpClient;
}

更新 1: Task.WhenAny 将是一个可行的解决方案。谢谢@CamiloTerevinto

try
{
  await await Task.WhenAny(udpReceiver1.StartAsync(cts.Token), udpReceiver2.StartAsync(cts.Token));
}
catch (Exception e)
{
  // Handle Exception...

  cts.Cancel();
}

更新 2:为了对所有任务进行更细粒度的异常处理,我会使用 @Servy 提出的我自己改编的 Task.WhenAll 实现。

【问题讨论】:

  • 嗯,Task.WhenAll 正是让你等待所有个任务。您可以使用Task.WhenAny,但需要自己处理非故障
  • 如果实例失败,您希望发生什么?你想处理这个错误,然后忽略其他正在运行的实例而终止进程吗?
  • 两个实例都应该在异常情况下正常终止
  • 我认为您应该在问题中包含您希望两个实例都正常终止的问题。这是回答这个问题的重要信息。

标签: c# async-await task-parallel-library


【解决方案1】:

该行为与框架WhenAll 实现有很大不同,您最好编写自己的改编版本,幸运的是实现起来并不特别困难。只需为每个任务附加一个延续,如果有任何一个被取消或出错,则生成的任务也会执行相同的操作,如果成功则存储结果,如果最后一个任务是成功的,则使用所有存储的结果完成任务.

public static Task<IEnumerable<TResult>> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
    var listOfTasks = tasks.ToList();
    if (listOfTasks.Count == 0)
    {
        return Task.FromResult(Enumerable.Empty<TResult>());
    }
    var tcs = new TaskCompletionSource<IEnumerable<TResult>>();
    var results = new TResult[listOfTasks.Count];
    int completedTasks = 0;
    for (int i = 0; i < listOfTasks.Count; i++)
    {
        int taskIndex = i;
        Task<TResult> task = listOfTasks[i];
        task.ContinueWith(_ =>
        {
            if (task.IsCanceled)
                tcs.TrySetCanceled();
            else if (task.IsFaulted)
                tcs.TrySetException(task.Exception.InnerExceptions);
            else
            {
                results[taskIndex] = task.Result;
                if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
                {
                    tcs.TrySetResult(results);
                }
            }
        });
    }
    return tcs.Task;
}

与许多基于任务的通用操作一样,您还需要一个没有结果的版本,如果您不想处理显着的开销,您真的只需要复制粘贴基于结果的方法,但要使用所有的结果都被撕掉了,这并不难,只是不优雅。将所有这些任务转化为有结果的任务也是可行的,但对于这样的操作,开销可能是有问题的。

public static Task WhenAll(IEnumerable<Task> tasks)
{
    var listOfTasks = tasks.ToList();
    if (listOfTasks.Count == 0)
    {
        return Task.CompletedTask;
    }
    var tcs = new TaskCompletionSource<bool>();
    int completedTasks = 0;
    for (int i = 0; i < listOfTasks.Count; i++)
    {
        int taskIndex = i;
        Task task = listOfTasks[i];
        task.ContinueWith(_ =>
        {
            if (task.IsCanceled)
                tcs.TrySetCanceled();
            else if (task.IsFaulted)
                tcs.TrySetException(task.Exception.InnerExceptions);
            else
            {
                if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
                {
                    tcs.TrySetResult(true);
                }
            }
        });
    }
    return tcs.Task;
}

【讨论】:

  • 恕我直言,此WhenAll 方法的当前名称具有误导性。一个更好的名字可能是WhenAllSuccessfulOrAnyFailed
  • @TheodorZoulias 您可以在自己的实现中随意调用它。就我个人而言,我认为这是在等待一组任务完成时几乎总是更可取的行为,所以我更喜欢它。老实说,我希望 .NET 实现从一开始就以这种方式实现(显然不会发生改变它的重大改变)。毕竟,您是否将 foreach 称为“ForEachOrAnyFailed”?您是否调用 List.RemoveAll “RemoveAllOrAnyFailed”?操作在遇到异常时提前结束是典型的,而不是非典型的行为。
  • 很好的合理化尝试,但我不买它。 :-) 如果WhenAll 是这样实现的,它会鼓励一劳永逸:我开始了 10 个任务,一个失败了,另一个我忘记了 9 个。这是草率的,它会导致程序不那么健壮。跨度>
  • @TheodorZoulias 无论您使用哪种行为,您都不会观察其他任务的结果。无论哪种方式,操作都会出错,因此您会遇到异常。这与实际上引发异常的所有其他操作没有什么不同,而不是观察预期的结果,它过早停止并且您得到异常。根据您的逻辑,每个抛出异常的方法都会鼓励触发并忘记该操作,因为您在抛出异常时会忽略结果。
  • @TheodorZoulias 在您需要完成 10 次操作的结果才能继续执行且其中一次失败的所有时间中,您执行 9 次对程序的正确性很重要更多操作,等待它们完成,然后忽略它们的结果,以便您可以抛出原始异常?我希望在实践中,几乎每次您需要 10 次操作的结果来继续您的方法时,一旦其中一个抛出,您就不会关心其他 9 次。
【解决方案2】:

可能有一种方法可以做到这一点,但我想不出一种方法会使您的代码变得非常混乱。最好在实际任务中处理异常。如果您需要使用通用代码处理它,请使用处理程序委托。

static async Task Main(string[] args)
{
    var cts = new CancellationTokenSource();

    //This is our common error handler
    void HandleException(Exception ex)
    {
        Log("Exception!" + ex.Message);
        cts.Cancel();
    }

    var udpReceiver1 = new UdpReceiver(localEndpoint1);
    var udpReceiver2 = new UdpReceiver(localEndpoint1);

    //We pass the handler as one of the arguments
    await Task.WhenAll(udpReceiver1.StartAsync(cts.Token, HandleException), udpReceiver2.StartAsync(cts.Token, HandleException));
}

class UdpReceiver
{
  public async Task StartAsync(CancellationToken cancellationToken, Action<Exception> errorHandler)
  {
      try
      {
          while (!cancellationToken.IsCancellationRequested)
          {
              //Main logic goes here
          }
      }
      catch(Exception ex)
      {
          errorHandler(ex);  //Call common error handling code
      }
  }

【讨论】:

  • 这不会导致该方法在第一次失败后继续运行,这是它特别要求的。实际上,这里的用例甚至不是处理异常,而是在到达时立即将其传播给调用者,这通过强制捕获和处理它来特别防止发生根本通过有问题的方法,并且还强制它等到一切完成。
【解决方案3】:

您可以分两步等待任务。在第一步中等待 任何 完成,并在失败的情况下启动取消。在这一步不要处理异常。在等待所有任务完成后,将异常处理延迟到第二步。两项任务都可能失败,因此您可能需要分别处理每一项的异常。

Task task1 = udpReceiver1.StartAsync(cts.Token);
Task task2 = udpReceiver2.StartAsync(cts.Token);

// Await any task to complete
Task firstCompletedTask = await Task.WhenAny(task1, task2);
if (firstCompletedTask.IsFaulted) cts.Cancel();

try
{
    // Await them all to complete
    await Task.WhenAll(task1, task2);
}
catch
{
    if (task1.IsFaulted) HandleException(task1.Exception.InnerException);
    if (task2.IsFaulted) HandleException(task2.Exception.InnerException);
}

【讨论】:

  • 您打算如何扩展这样的解决方案以支持两个以上的任务?即使只是尝试将这种方法应用于 4 或 5 个任务,更不用说未知数量的任务,在代码复杂性和性能方面的扩展性也会很差。
  • @Servy 是的,如果任务超过两个,我当然需要用自定义的WhenAnyFailed 方法替换Task.WhenAny,实现类似于yoursWhenAll。 :-)
  • @Servy 实际上现在我正在考虑它,如果我有多个任务,我会放弃 Task.WhenAny 步骤,而是为每个任务附加一个延续,如果失败将取消令牌。我在this 答案(OnErrorCancel 方法)中实现了类似的东西。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-04
  • 1970-01-01
  • 1970-01-01
  • 2012-07-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多