【问题标题】:Passing async method into Parallel.ForEach将异步方法传递给 Parallel.ForEach
【发布时间】:2018-11-27 09:59:27
【问题描述】:

我正在阅读 this post 关于 Parallel.ForEach 的内容,其中指出“Parallel.ForEach 与传入异步方法不兼容。”

所以,为了检查我写了这段代码:

static async Task Main(string[] args)
{
    var results = new ConcurrentDictionary<string, int>();

    Parallel.ForEach(Enumerable.Range(0, 100), async index =>
    {
        var res = await DoAsyncJob(index);
        results.TryAdd(index.ToString(), res);
    });         

    Console.ReadLine();
}

static async Task<int> DoAsyncJob(int i)
{
    Thread.Sleep(100);
    return await Task.FromResult(i * 10);
}

此代码同时填写results 字典。

顺便说一句,我创建了一个ConcurrentDictionary&lt;string, int&gt; 类型的字典,因为如果我有ConcurrentDictionary&lt;int, int&gt;,当我在调试模式下探索它的元素时,我看到元素是按键排序的,我认为因此添加了元素。

所以,我想知道我的代码是否有效?如果它“与传入异步方法不兼容”,为什么它运行良好?

【问题讨论】:

  • 不要Parallel.ForEach 用于 CPU 密集型计算,不识别异步方法。它不会等待它们,本质上是将它们转换为async void 即发即弃的呼叫。无论如何,您的方法不是异步的,因此无法说出 正确的 调用是什么样的
  • @PanagiotisKanavos 请发布您的信息来源。谢谢
  • I want to know is my code is valid?No. why it works well? 它没有,但你没有意识到它,因为 a) 它不执行任何异步工作。
  • Parallel.ForEach - “安排多个线程并行执行以下工作” - async - “好吧,这个线程没有什么有用的工作做,直到其他事情完成这个等待”。即使它确实有效(它没有,正如 Panagitotis 所说,它完全是同步的),你以一种奇怪的方式组合事物,过度分配资源然后忽略它们。
  • @Seabizkit 没有什么可说服的。没有接受任务的重载。没有它,就无法await 任何异步调用。 async 不会等待任何东西,也不会让任何东西异步运行。 async void 调用无法等待,这就是为什么它们被视为事件处理程序之外的错误。

标签: c# async-await parallel.foreach


【解决方案1】:

此代码有效只是因为DoAsyncJob 并不是真正的异步方法。 async 不会使方法异步工作。等待 Task.FromResult 返回的完成任务也是同步的。 async Task Main 不包含任何异步代码,这会导致编译器警告。

一个演示Parallel.ForEach 不能与异步方法一起工作的示例应该调用一个真正的异步方法:

    static async Task Main(string[] args)
    {
        var results = new ConcurrentDictionary<string, int>();

        Parallel.ForEach(Enumerable.Range(0, 100), async index =>
        {
            var res = await DoAsyncJob(index);
            results.TryAdd(index.ToString(), res);
        });  
        Console.WriteLine($"Items in dictionary {results.Count}");
    }

    static async Task<int> DoAsyncJob(int i)
    {
        await Task.Delay(100);
        return i * 10;
    }

结果将是

Items in dictionary 0

Parallel.ForEach 没有接受Func&lt;Task&gt; 的重载,它只接受Action 代表。这意味着它不能等待任何异步操作。

async index 被接受,因为它隐含地是 async void delegate。就Parallel.ForEach而言,它只是一个Action&lt;int&gt;

结果是Parallel.ForEach 触发了 100 个任务并且从不等待它们完成。这就是应用程序终止时字典仍然为空的原因。

【讨论】:

  • 您在评论中说我可以使用 list.Select(item=>Task.Run(...)) 为什么我应该在 Select 方法中使用 Task.Run()?我可以像这样使用: var tasks = Enumerable.Range(0, 100).Select(async index => { var res = await DoAsyncJob(index); results.TryAdd(index.ToString(), res); }) ;等待 Task.WhenAll(tasks);
  • Async 用于释放将等待“I/O”内容的线程,因此在这种情况下,它不会释放一些线程,因为 foreach 中的操作可能是 db 访问例如...您是说如果调用者 aka Parallel.ForEach 可以返回 Task 会是这种情况,而事实并非如此... Parallel 中的所有 Task 操作都无法正确实现异步库。我说的好吗?
  • @DmitryS 因为你没有解释你真正想要做什么。如果你想在后台处理一些数据项而不阻塞,你可以使用.Select(..=&gt;Task.Run..)。如果您想测试多种算法并获得第一个结果,则同样适用。
  • @Seabizkit Parallel.ForEach 专门用于数据并行。这就是为什么它不为任务提供任何重载,并且实际上使用 current 线程进行处理。它将输入数据划分为与内核数量一样多的分区,并为每个分区使用 一个 任务以最小化跨线程同步。这是与异步或并发操作完全不同的问题
  • @PanagiotisKanavos 谢谢!我认为这与我正在寻找的信息越来越接近。
【解决方案2】:

async 方法是一种启动并返回 Task 的方法。

你的代码在这里

Parallel.ForEach(Enumerable.Range(0, 100), async index =>
{
    var res = await DoAsyncJob(index);
    results.TryAdd(index.ToString(), res);
});        

并行运行 100 次异步方法。也就是说,它并行化了任务创建,而不是整个任务。当ForEach 返回时,您的任务正在运行,但不一定完成。

您的代码有效,因为DoAsyncJob() 实际上不是异步的 - 您的任务在返回时完成。 Thread.Sleep() 是一种同步方法。 Task.Delay() 是它的异步等价物。

了解CPU-bound and I/O-bound operations 之间的区别。正如其他人已经指出的那样,并行性(和Parallel.ForEach)适用于受 CPU 限制的操作,异步编程是不合适的。

【讨论】:

    【解决方案3】:

    如果你已经有异步工作,则不需要Parallel.ForEach

    static async Task Main(string[] args)
    {
    
        var results = await new Task.WhenAll(
            Enumerable.Range(0, 100)
            Select(i => DoAsyncJob(I)));
    
        Console.ReadLine();
    }
    

    关于你的异步工作,你要么一直异步:

    static async Task<int> DoAsyncJob(int i)
    {
        await Task.Delay(100);
        return await Task.FromResult(i * 10);
    }
    

    更好:

    static async Task<int> DoAsyncJob(int i)
    {
        await Task.Delay(100);
        return i * 10;
    }
    

    或者根本没有:

    static Task<int> DoAsyncJob(int i)
    {
        Thread.Sleep(100);
        return Task.FromResult(i * 10);
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-04-11
      • 1970-01-01
      • 2018-11-09
      相关资源
      最近更新 更多