【问题标题】:Nested async methods in a Parallel.ForEachParallel.ForEach 中的嵌套异步方法
【发布时间】:2020-06-08 23:35:43
【问题描述】:

我有一个在其中运行多个异步方法的方法。我必须遍历设备列表,并将设备传递给此方法。我注意到这需要很长时间才能完成,所以我正在考虑使用Parallel.ForEach,以便它可以同时针对多个设备运行此过程。

假设这是我的方法。

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}

然后 DoSomething2 也调用了一个异步方法。

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}

设备列表随着时间的推移不断变大,因此该列表增长得越多,程序完成对每个设备运行ProcessDevice() 所需的时间就越长。我想一次处理多个设备。所以我一直在研究使用Parallel.ForEach

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})

该程序似乎在设备完全处理之前完成。我也尝试过创建一个任务列表,然后为每个设备添加一个运行 ProcessDevice 的新任务到该列表中,然后等待 Task.WhenAll(listOfTasks);

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task = Task.Run(async () => await ProcessDevice(device));
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

但似乎在ProcessDevice() 实际完成运行之前,该任务被标记为已完成。

请原谅我对这个问题的无知,因为我是并行处理的新手,不知道发生了什么。是什么导致了这种行为,您是否可以提供任何文档来帮助我更好地了解该怎么做?

【问题讨论】:

标签: c# parallel-processing async-await task-parallel-library parallel.foreach


【解决方案1】:

您不能将asyncParallel.ForEach 混合使用。由于您的底层操作是异步的,因此您希望使用异步并发,而不是并行。异步并发最容易用WhenAll表示:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);

【讨论】:

  • 当您说“您不能将异步与 Parallel.ForEach 混合使用”时,我们同意这只是他的情况,而不是一般情况?
  • @Thibaut 一般来说。我刚刚写了一个答案来更详细地解释原因。
【解决方案2】:

在您的上一个示例中存在一些问题:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    await  Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);

执行await Task.Run(async () =&gt; await ProcessDevice(device)); 意味着在前一个循环完成之前,您不会进入foreach 循环的下一个迭代。从本质上讲,您仍然一次只做一个。

此外,您没有向listOfTasks 添加任何任务,因此它保持为空,因此Task.WhenAll(listOfTasks) 立即完成,因为没有任务等待。

试试这个:

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    var task = Task.Run(async () => await ProcessDevice(device))
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

【讨论】:

  • 我不知道将所有设备添加到任务列表中以同时并行化所有设备是否是个好主意。如果太多,这将比所有其他解决方案都慢。
  • 这在很大程度上取决于ProcessDevice() 真正在做什么以及有多少设备,但是是的,应该对其进行测试以查看其性能。一种不同但类似的方法是将device 拆分为多个批次,然后并行执行 5 个批次,每个批次同步进行。
  • @WSC - 抱歉我更新了我的问题,我确实有 listOfTasks.Add(task);但在问题中忘记了。
  • @JaronJohnson 您更新的代码没有意义。您没有将 task 设置为任何内容。更仔细地查看这个答案中的代码。
  • @GabrielLuci 我应该只是复制并粘贴我的代码,而不是在问题中再次写出它。再次更新它......
【解决方案3】:

我可以用Parallel.ForEach 解释这个问题。需要理解的重要一点是,当await 关键字作用于不完整的Task 时,它返回。如果方法签名允许(如果不是void),它将返回自己不完整的Task。然后由调用者使用 Task 对象来等待作业完成。

但是Parallel.ForEach中的第二个参数是Action&lt;T&gt;,这是一个void方法,也就是说不能返回Task,也就是说调用者(本例中是Parallel.ForEach)没有办法等到工作完成。

所以在你的情况下,一旦它到达await ProcessDevice(device),它就会返回并且没有任何东西等待它完成,所以它开始下一次迭代。当Parallel.ForEach 完成时,它所做的只是开始所有任务,但没有等待它们。

所以不要在异步代码中使用Parallel.ForEach

斯蒂芬的回答更合适。您也可以使用 WSC 的答案,但这对于较大的列表可能很危险。一次创建成百上千个新线程无助于提高性能。

【讨论】:

  • 谢谢你的解释,有道理。
  • 是否会使用 SemaphoreSlim 来限制并发级别,并结合这些答案中的任何一个来帮助保持平衡?再次抱歉,我对这个主题很陌生。我真的需要对这个话题进行自我教育,但不知道从哪里开始。
  • 可以,但对于您的情况,这可能会使事情过于复杂。 ProcessDevice() 是否在做一些处理器密集型的事情?还是发出 I/O 请求(读取文件、发出网络请求等)?
  • I/O 比处理器密集型要多。发生了一些计算,但它非常简单。它会发出一些网络请求并更新数据库记录。
  • 那么斯蒂芬的回答是最好的方法。一旦发送 I/O 请求,await 将返回并启动列表中的下一个请求。所以他们都立即开始,然后在Task.WhenAll,它将等待他们全部完成。它甚至可能都发生在同一个线程上。
【解决方案4】:

如果你要求的是这个,我不太确定,但我可以举例说明我们如何启动异步进程

 private readonly Func<Worker> _worker;

    private void StartWorkers(IEnumerable<Props> props){
    Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
    }

建议阅读有关 Parallel.ForEach 的内容,因为它会为您做一些事情。

【讨论】:

    猜你喜欢
    • 2012-07-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多