【问题标题】:Parallel foreach with asynchronous lambda带有异步 lambda 的并行 foreach
【发布时间】:2013-02-14 16:52:52
【问题描述】:

我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。

如果我想在并行循环的 lambda 中调用 C# 中标记为 async 的方法,就会出现问题。例如:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

计数为 0 时会出现问题,因为创建的所有线程实际上只是后台线程,Parallel.ForEach 调用不会等待完成。如果我删除 async 关键字,方法如下所示:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

它可以工作,但它完全禁用了 await 聪明,我必须做一些手动异常处理..(为简洁起见已删除)。

如何实现Parallel.ForEach 循环,它在 lambda 中使用 await 关键字?有可能吗?

Parallel.ForEach 方法的原型采用 Action&lt;T&gt; 作为参数,但我希望它等待我的异步 lambda。

【问题讨论】:

  • 我假设您打算在第二个代码块中从 await GetData(item) 中删除 await,因为它会按原样产生编译错误。
  • 附带说明,ConcurrentBag&lt;T&gt; 是一个very specialized 集合。在这种情况下,ConcurrentQueue&lt;T&gt; 会更好地为您服务。

标签: c# async-await task-parallel-library parallel.foreach


【解决方案1】:

使用SemaphoreSlim可以实现并行控制。

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

【讨论】:

  • SemaphoreSlim 应该用 using 语句包裹,因为它实现了 IDisposable
  • 另外,这一行“await throttler.WaitAsync();”不应在 try 块中,因为如果它引发异常,您将在未获得锁时调用 Release。
  • @tim 假设您的评论已解决,请您删除它(和/或@我,如果我忘记删除它!)?
【解决方案2】:

随着 .Net 6 的引入,Parallel.ForEachAsync 现已推出。

using System.Net.Http.Headers;
using System.Net.Http.Json;
 
var userHandlers = new []
{
    "users/okyrylchuk",
    "users/shanselman",
    "users/jaredpar",
    "users/davidfowl"
};
 
using HttpClient client = new()
{
    BaseAddress = new Uri("https://api.github.com"),
};
client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("DotNet", "6"));
 
ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);
 
    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});
 
public class GitHubUser
{
    public string Name { get; set; }
    public string  Bio { get; set; }
}

github 上的完整问题跟踪 还有一些usage examples here by SCOTT HANSELMAN

【讨论】:

【解决方案3】:

其中一个新的 .NET 6 API 是 Parallel.ForEachAsync,这是一种安排异步工作的方法,允许您控制并行度:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

【讨论】:

    【解决方案4】:

    从其他答案和公认的asnwer引用的文章编译的最简单的扩展方法:

    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
    {
        var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
        var tasks = source.Select(async item =>
        {
            await throttler.WaitAsync();
            try
            {
                await asyncAction(item).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });
        await Task.WhenAll(tasks);
    }
    

    【讨论】:

    • 我会赞成,但我不喜欢随意的int maxDegreeOfParallelism = 10。最好让用户明确指定并发级别。您还可以考虑在asyncAction(item) 之后添加.ConfigureAwait(false)(但不在throttler.WaitAsync() 之后)。
    • @TheodorZoulias 两个优点,已编辑。还在开发一个支持cancellationToken 的变体将在稍后发布。
    • 值得注意的是,大多数基于Task.WhenAll的解决方案只适用于数量相对较少的任务,或者保证不会抛出的asyncActions。否则,等待半小时完成 10,000 个任务会非常令人沮丧,结果只是收到一个异常(这可能是第一个任务引发的)。
    • 这不是一个强大的解决方案,原因有两个。首先,如果抛出异常,它不会终止循环。其次,throttler 没有被释放。
    • @zmechanic 我认为是否中止异常循环取决于开发人员。
    【解决方案5】:

    对于更简单的解决方案(不确定是否是最佳解决方案),您可以简单地将 Parallel.ForEach 嵌套在 Task 中 - 就这样

    var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
    Task.Run(() =>
    {
        Parallel.ForEach(myCollection, options, item =>
        {
            DoWork(item);
        }
    }
    

    ParallelOptions 会为您执行节流,开箱即用。

    我在现实世界的场景中使用它在后台运行很长时间的操作。这些操作是通过 HTTP 调用的,它的设计目的是在长操作运行时不会阻塞 HTTP 调用。

    1. 为长时间后台操作调用 HTTP。
    2. 操作从后台开始。
    3. 用户获取状态 ID,可用于使用另一个 HTTP 调用检查状态。
    4. 后台操作更新其状态。

    这样,CI/CD 调用不会因为长时间的 HTTP 操作而超时,而是每隔 x 秒循环一次状态而不阻塞进程

    【讨论】:

    • Gravity 很抱歉,我不得不否决您的答案,但是将异步委托传递给 Parallel.ForEach 方法不仅仅是“不是最佳实践”。它存在严重且不可挽回的缺陷。 Parallel.ForEach 不理解异步委托,因此 lambda 为 async void。这不是一发不可收拾,而是一发不可收拾。在这种情况下,Parallel.ForEach 不会等待启动的操作完成,不会强制执行最大程度的并行,也不会传播异常。任何异常都将未处理,并使进程崩溃。
    • 重力这是一个不好的例子。并行化Console.WriteLine 方法没有意义,因为这个方法是同步的。一次只有一个线程可以写入Console。还要注意Thread.Sleep(15000); 的丑陋之处。您添加了这一行,因为否则程序将在由滥用的 Parallel.ForEach 循环无法控制地启动的 async void 操作完成之前结束。这不是编写软件的正确方法。
    • 好吧,无论您是否推荐它们,您都不能通过提供不良示例和间接宣传不良做法来获得好票。从你的答案中删除所有不好的东西并保留好的东西怎么样?
    • 重力短语 “您也可以使用异步 lambda”Parallel.ForEach 方法相关,是我无可争辩的反对意见。之前或之后的警告,或删除线之类的删除指示,都无法容忍此短语的存在。我只是在谈论我自己的投票标准。其他任何人都可以投票,但他们认为合适。
    • 接受并且有意义。没有那样想,我同意你的标准并相应地更改我的帖子。
    【解决方案6】:

    以下设置为使用IAsyncEnumerable,但可以修改为使用IEnumerable,只需更改类型并删除foreach 上的“等待”。这比创建无数并行任务然后等待所有任务更适合大量数据。

        public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
        {
            ActionBlock<T> block = new ActionBlock<T>(
               action, 
               new ExecutionDataflowBlockOptions 
               { 
                 MaxDegreeOfParallelism = maxDegreeOfParallelism, 
                 BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
               });
    
            await foreach (T item in enumerable)
            {
               await block.SendAsync(item).ConfigureAwait(false);
            }
    
            block.Complete();
            await block.Completion;
        }
    

    【讨论】:

    • 您应该将semaphore.Wait() 替换为await semaphore.WaitAsync(),以避免阻塞调用者。另请注意,您的解决方案中的SemaphoreSlim 的功能可以替换为ActionBlockBoundedCapacity 配置,以及可等待的SendAsync 方法。相比之下,它更有效(内存方面)。
    • @TheodorZoulias 非常感谢您的反馈!这是我正在为一个项目积极开展的工作,因此我将查看这些更改并更新我的解决方案。
    • @TheodorZoulias 的stackoverflow.com/a/65251949/477420 回答显示了非常相似的方法......大概SendAsync 不会等待操作完成(从文档中我不清楚)
    • Caleb Holt 您可能需要注意的另一个问题是,枚举用户提供的 enumerable 可能会导致异常,在这种情况下,您的实现将立即传播此异常,无需等待ActionBlock 的完成。这不是最佳行为,因为它可能会使在后台运行的任务不被观察(以即发即弃的方式)。正确实现ForEachAsync 方法可能非常棘手。我最近才意识到这个问题。
    • @AlexeiLevenkov SendAsync 方法的文档相当混乱。我怀疑这个星球上是否存在过足够聪明的人,仅仅通过阅读文档就可以理解这种方法的作用。应该深入研究源代码并了解PostSendAsync 方法都基于隐藏(显式实现)OfferMessage API,它有 5 个可能的返回值。 SendAsync 异步处理 Postponed 返回值。
    【解决方案7】:

    在接受的答案中,不需要 ConcurrentBag。 这是一个没有它的实现:

    var tasks = myCollection.Select(GetData).ToList();
    await Task.WhenAll(tasks);
    var results = tasks.Select(t => t.Result);
    

    任何“// some pre stuff”和“// some post stuff”都可以进入GetData实现(或另一个调用GetData的方法)

    除了更短之外,没有使用“async void”lambda,这是一种反模式。

    【讨论】:

      【解决方案8】:

      我的 ParallelForEach 异步的轻量级实现。

      特点:

      1. 限制(最大并行度)。
      2. 异常处理(完成时会抛出聚合异常)。
      3. 内存高效(无需存储任务列表)。

      public static class AsyncEx
      {
          public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
          {
              var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
              var tcs = new TaskCompletionSource<object>();
              var exceptions = new ConcurrentBag<Exception>();
              bool addingCompleted = false;
      
              foreach (T item in source)
              {
                  await semaphoreSlim.WaitAsync();
                  asyncAction(item).ContinueWith(t =>
                  {
                      semaphoreSlim.Release();
      
                      if (t.Exception != null)
                      {
                          exceptions.Add(t.Exception);
                      }
      
                      if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                      {
                          tcs.TrySetResult(null);
                      }
                  });
              }
      
              Volatile.Write(ref addingCompleted, true);
              await tcs.Task;
              if (exceptions.Count > 0)
              {
                  throw new AggregateException(exceptions);
              }
          }
      }
      

      使用示例:

      await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
      {
          var data = await GetData(i);
      }, maxDegreeOfParallelism: 100);
      

      【讨论】:

      • tcs.SetResult(null) 需要替换为tcs.TrySetResult(null)
      • @Hocas,你认为为什么需要 TrySetResult?
      • 我上次使用此代码时多次调用SetResult 时遇到问题)When to use SetResult() vs TrySetResult()
      • @Hocas,这很有趣。 tcs.SetResult(null) 预计不会被执行两次。
      • 使用SemaphoreSlimCurrentCount 属性来控制执行流不是一个好主意。在大多数情况下,它会产生竞争条件。使用Volatile.Read 也很不稳定(另一种可能的竞争条件)。在生产环境中我不会相信这个解决方案。
      【解决方案9】:

      如果你只是想要简单的并行,你可以这样做:

      var bag = new ConcurrentBag<object>();
      var tasks = myCollection.Select(async item =>
      {
        // some pre stuff
        var response = await GetData(item);
        bag.Add(response);
        // some post stuff
      });
      await Task.WhenAll(tasks);
      var count = bag.Count;
      

      如果您需要更复杂的东西,请查看Stephen Toub's ForEachAsync post

      【讨论】:

      • 可能需要一个节流机制。这将立即创建尽可能多的任务,因为有可能以 10k 网络请求等而告终的项目。
      • @usr Stephen Toub 文章中的最后一个例子解决了这个问题。
      • @LukePuplett 它创建dop 任务,然后每个任务依次处理输入集合的某个子集。
      • @Afshin_Zavvar:如果你调用Task.Run 而没有await 得到结果,那么这只是将一劳永逸的工作扔到线程池上。这几乎总是一个错误。
      • 此方法的一个简单限制机制是将列表拆分为包含 N 个条目的小列表,并为每个较小的批次执行此任务 select + Task.WhenAll。这样您就不会为大型数据集生成数千个任务。
      【解决方案10】:

      您可以使用AsyncEnumerator NuGet Package 中的ParallelForEachAsync 扩展方法:

      using Dasync.Collections;
      
      var bag = new ConcurrentBag<object>();
      await myCollection.ParallelForEachAsync(async item =>
      {
        // some pre stuff
        var response = await GetData(item);
        bag.Add(response);
        // some post stuff
      }, maxDegreeOfParallelism: 10);
      var count = bag.Count;
      

      【讨论】:

      • 这是你的包裹?我看到你在几个地方发过这个帖子了? :D 哦等等.. 你的名字在包裹上 :D +1
      • @ppumkin,是的,这是我的。我一遍又一遍地看到这个问题,所以决定以最简单的方式解决它,并让其他人免于挣扎:)
      • 你打错了:maxDegreeOfParallelism > maxDegreeOfParalellism
      • 正确的拼写确实是 maxDegreeOfParallelism,但是在@ShiranDror 的评论中有一些东西 - 在你的包中你错误地调用了变量 maxDegreeOfParalellism(因此你引用的代码在你改变它之前不会编译..)
      • @SergeSemenov 在这种情况下,我认为您可能想要更新此答案中的链接,因为它指向 V1.10。既然你在这个问题上很活跃,我会把它留给你。
      【解决方案11】:

      我为此创建了一个扩展方法,它利用 SemaphoreSlim 并允许设置最大并行度

          /// <summary>
          /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
          /// </summary>
          /// <typeparam name="T">Type of IEnumerable</typeparam>
          /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
          /// <param name="action">an async <see cref="Action" /> to execute</param>
          /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
          /// Must be grater than 0</param>
          /// <returns>A Task representing an async operation</returns>
          /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
          public static async Task ForEachAsyncConcurrent<T>(
              this IEnumerable<T> enumerable,
              Func<T, Task> action,
              int? maxDegreeOfParallelism = null)
          {
              if (maxDegreeOfParallelism.HasValue)
              {
                  using (var semaphoreSlim = new SemaphoreSlim(
                      maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
                  {
                      var tasksWithThrottler = new List<Task>();
      
                      foreach (var item in enumerable)
                      {
                          // Increment the number of currently running tasks and wait if they are more than limit.
                          await semaphoreSlim.WaitAsync();
      
                          tasksWithThrottler.Add(Task.Run(async () =>
                          {
                              await action(item).ContinueWith(res =>
                              {
                                  // action is completed, so decrement the number of currently running tasks
                                  semaphoreSlim.Release();
                              });
                          }));
                      }
      
                      // Wait for all tasks to complete.
                      await Task.WhenAll(tasksWithThrottler.ToArray());
                  }
              }
              else
              {
                  await Task.WhenAll(enumerable.Select(item => action(item)));
              }
          }
      

      示例用法:

      await enumerable.ForEachAsyncConcurrent(
          async item =>
          {
              await SomeAsyncMethod(item);
          },
          5);
      

      【讨论】:

      • 'using' 无济于事。 foreach 循环将无限期地等待信号机。试试这个重现问题的简单代码: await Enumerable.Range(1, 4).ForEachAsyncConcurrent(async (i) => { Console.WriteLine(i); throw new Exception("test exception"); }, maxDegreeOfParallelism: 2);
      • @nicolay.anykienko 你是对的#2。内存问题可以通过添加 tasksWithThrottler.RemoveAll(x => x.IsCompleted); 来解决
      • 我已经在我的代码中尝试过了,如果我的 maxDegreeOfParallelism 不为空,代码就会死锁。在这里你可以看到所有要重现的代码:stackoverflow.com/questions/58793118/…
      • 当我考虑实现这种方法以供我使用时,我担心的是,我正在处理的 170 万行会导致每个行在 tasksWithThrottler 列表中都有一份工作,这似乎并不理想或真正可扩展的。发布我的队友和我想出的解决方案,将 ActionBlock 作为单独的解决方案。
      • 请添加取消令牌代码...以及plz
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-29
      相关资源
      最近更新 更多