【问题标题】:Trying to run multiple tasks in parallel with .WhenAll but tasks aren't being run in parallel尝试与 .WhenAll 并行运行多个任务,但任务没有并行运行
【发布时间】:2022-03-11 12:32:55
【问题描述】:

我有一种将 csv 文件转换为特定模型的方法,我想将其拆分为多个任务,因为有 700k+ 条记录。我在方法中使用了.Skip.Take,所以每次运行该方法都知道从哪里开始以及需要多少。我有一个数字 1-10 的列表,我想对其进行迭代并创建任务来运行此方法,使用该迭代器创建任务并进行一些数学运算以确定要跳过多少条记录。

这是我创建任务的方式:

var numberOfTasksList = Enumerable.Range(1, 10).ToList();
//I left out the math to determine rowsPerTask used as a parameter in the below method for brevity's sake
var tasks = numberOfTasksList.Select(i
                =>  ReadRowsList<T>(props, fields, csv, context, zohoEntities, i, i*rowsPerTask, (i-1)*rowsPerTask));

           await Task.WhenAll(tasks);

使用的ReadRowsList 方法如下所示(不带参数):

public static async Task<string> ReadRowsList<T>(...parameters) where T : class, new()
   {
     //work to run
     return $"added rows for task {i}";
   }

它返回的那个方法的字符串只是一个简单的行,上面写着 $" added rows for task {i}" 所以它不是一个真正的异步/等待,因为我只是返回一个字符串来说明迭代何时完成.

但是,当我运行程序时,该方法会等待第一次迭代(其中 i=1)完成,然后再开始运行程序的第二次迭代,因此它不是并行运行的。在异步/并行编程方面,我不是最好的,但是有什么明显的事情会导致任务必须等到上一个迭代完成才能开始下一个任务?据我了解,使用上述代码创建任务并使用.WhenAll(tasks) 会为每次迭代创建一个新线程,但我一定会遗漏一些东西。

【问题讨论】:

  • 您是否收到有关缺少 await 运算符的 async 方法的警告?
  • 顺便说一句,您是否考虑过使用Parallel.ForEachPartitioner 类(example 1example 2),而不是手动生成任务和分区?
  • 您所拥有的是如何并行执行异步方法(在异步部分等待时运行任务的非异步部分),但没有任何等待它可能只是同步代码。
  • @TheodorZoulias 我曾尝试在程序的另一部分使用 Parallel.ForEach,但是当我使用 HangFire 的 PerformContext 时,它阻止了我看到程序屏幕上正在读取的内容,但是我可以看看我是否可以解决这部分问题。谢谢你的例子。
  • 简单的经验法则是,如果您的代码受 CPU 限制(复杂的数学计算),则使用 Parallel.ForEach;如果您的代码受 IO 限制(文件 IO、数据库调用、Web 服务调用),则使用 async/await .

标签: c# async-await parallel-processing


【解决方案1】:

简而言之:

  1. async 不等于多线程;和
  2. 创建函数 async Task 不会使其异步

Task.WhenAll 使用没有awaits 的假装异步代码运行时,当前线程无法“释放”手头的任务,也无法开始处理另一个任务。

正如 cmets 中所指出的,构建链通过以下方式警告您: This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

简单的例子

让我们考虑两个具有相同签名的函数,一个带有异步代码,一个没有。

static async Task DoWorkPretendAsync(int taskId)
{
    Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > start");
    Thread.Sleep(TimeSpan.FromSeconds(1));
    Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > done");
}

static async Task DoWorkAsync(int taskId)
{
    Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > start");
    await Task.Delay(TimeSpan.FromSeconds(1));
    Console.WriteLine($"Thread: {Thread.CurrentThread.ManagedThreadId} -> task:{taskId} > done");
}

如果我们用下面的 sn-p 测试它们

await DoItAsync(DoWorkPretendAsync);
Console.WriteLine();
await DoItAsync(DoWorkAsync);

async Task DoItAsync(Func<int, Task> f)
{
    var tasks = Enumerable.Range(start: 0, count: 3).Select(i => f(i));
    Console.WriteLine("Before WhenAll");
    await Task.WhenAll(tasks);
    Console.WriteLine("After WhenAll");
}

我们可以看到DoWorkPretendAsync的任务是按顺序执行的。

Before WhenAll
Thread: 1 -> task:0 > start
Thread: 1 -> task:0 > done
Thread: 1 -> task:1 > start
Thread: 1 -> task:1 > done
Thread: 1 -> task:2 > start
Thread: 1 -> task:2 > done
After WhenAll

Before WhenAll
Thread: 1 -> task:0 > start
Thread: 1 -> task:1 > start
Thread: 1 -> task:2 > start
Thread: 5 -> task:0 > done
Thread: 5 -> task:2 > done
Thread: 7 -> task:1 > done
After WhenAll

注意事项:

  • 即使是真正的异步,所有任务都是由同一个线程启动的;
  • 在这个特定的运行中,两个任务由同一个线程 (id:5) 完成。这根本无法保证 - 一个任务可以在一个线程上启动,然后在池中的另一个线程上继续。

【讨论】:

  • 很好的解释,感谢您提供的示例来展示实际想法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-06-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-01-10
  • 1970-01-01
相关资源
最近更新 更多