【问题标题】:Purely computational tasks executing sequentially纯计算任务按顺序执行
【发布时间】:2014-09-18 11:41:41
【问题描述】:

我尝试使用 async-await 并行运行几种计算密集型方法。

我有一个包含大约 80,000 个对象的列表,我将这些对象输入到返回任务的函数中:

public static void Main(string[] args)
{
    //...blah blah blah...

    var runner = new Runner(); //in a nutshell, I manage to get an object that has an async method on it.
    runner.Run().Wait(); //and I wait for it to complete.

    //...blah blah blah...
}

我的跑步者对象中有以下方法(或多或少......这是一个人为的例子):

public async Task Run()
{
    var items = ... //this is my list
    var tasks = items.Select(i => this.RunItemAsync(i)).ToArray();

    //I don't get here until the tasks are all finished...every single one...

    await Task.WhenAll(tasks).ConfigureAwait(false);
}

private async Task RunItemAsync(Item i)
{
    var subItems = i.GetSubItems();

    var tasks = subItems.Select(s => s.RunSubItemAsync(s)).ToArray();

    //I don't get here until the sub item tasks are all finished...

    await Task.WhenAll(tasks).ConfigureAwait(false);

    //does computations, doesn't wait on any async i/o, etc
    await this.ProcessAsync(i).ConfigureAwait(false);
}

private async Task RunSubItemAsync(SubItem s)
{
    //does computations, doesn't wait on any async i/o, etc
    ...
}

在过去一年左右的时间里,我一直在为异步等待而苦苦挣扎,有时使用 TPL Dataflow 实现了出色的性能并做出了一些非常酷的事情,但每隔一段时间我就会遇到这样的事情,而我就是做不到似乎让任务“激活”了它们的并行能力。这个特定的项目将在大约 16 个内核的服务器上运行,所以我真的很想利用它。我的开发虚拟机只分配了 2 个内核,但这仍应允许任务激活和并行运行(过去也是如此)。

我的观察

  • 我设法通过在RunItemAsync 方法的开头插入一个小的await Task.Delay(1).ConfigureAwait(false) 来并行运行。我知道这会创建某种形式的“喘息空间”,允许另一个任务使用线程。然而,这还不够,因为它肮脏、不可靠,并且需要我有不可接受的延迟。
  • 没有前面提到的Delay 调用,所有任务都在Main Thread 上运行。这对我来说很明显,因为 Main 是启动这一切的函数。我对此没有任何问题,但我过去曾遇到过experiences,在new Thread-created 线程上运行任务导致它无法使用默认任务调度程序运行,并且每个任务最终都在该线程上按顺序运行。也许Main Thread 属于这一类?

我的问题

我了解运行 ToArray 本身不会执行异步代码。但是,我希望发生的是,当我的 RunItemAsync 方法到达其第一个 await 时,它将“停止”并允许调用 ToArray 的下一次迭代运行。

我也明白添加await Task.Delay 是有效的,因为它导致了我上面想要的结果。必须有某种方法可以做到这一点,而无需诉诸 await Task.Delay...

我怎样才能并行启动所有这些计算密集型任务,而不会无意中导致它们按顺序运行?

【问题讨论】:

  • 您的意思是items.Select(i => this.RunItemAsync(i)).ToArray() 还是您没有提供的Run(Item) 方法?
  • 已修复。这就是我中途重命名时发生的情况......
  • 这真的取决于进度如何前进(一次一个,或并行),具体取决于 RunSubItemAsync 所做的事情。如果没有这方面的详细信息,很难准确说出需要更改的内容
  • 它确实做了一堆数学运算。它遍历一个图,确定与SubItem s 表示的对象最近的节点,确定哪些节点最适合我正在计算的任务,执行一些无法异步获取的数据,对 @ 进行一些更改987654340@,然后退出。它完全同步。这种计算不能并行化,但我想做的是在不同的SubItems/Items 上并行运行许多这些计算。我遇到的问题是他们从不“释放”线程以便其他人可以执行,因此在完成之前没有其他任务开始。
  • 当你调用它 RunSubItemAsync 时,它表明它不是同步的。

标签: c# parallel-processing async-await


【解决方案1】:

目前有四种主要的并发库/技术可用。

  • async 最适合自然异步的单个操作,例如 I/O。
  • 任务并行库 (TPL) 最适合并行处理 CPU 密集型工作。
  • TPL 数据流跨越 async 和并行,为处理数据提供网格/管道抽象。
  • 响应式扩展 (Rx) 在概念上类似于 TPL 数据流,但没有并行功能,而是具有大量与时间相关的功能。

在您的情况下,您希望使用 TPL。一个简单的Parallel.ForEach 就足够了。

最后一点,同步代码(包括受 CPU 限制的并行代码)应该有一个同步 API;并且异步代码应该有一个异步 API。所以你希望你的 API 看起来是同步的,而不是异步的。

所以,是这样的:

public static void Main(string[] args)
{
  var runner = new Runner();
  runner.Run();
}

public void Run()
{
  var items = ...
  Parallel.ForEach(items, i => this.RunItem(i));
}

private void RunItem(Item i)
{
  var subItems = i.GetSubItems();
  Parallel.ForEach(subItems, s => s.RunSubItem(s));
  this.Process(i);
}

private void RunSubItem(SubItem s)
{
  SemaphoreSlim.Wait(); // instead of WaitAsync
  ...
}

【讨论】:

    【解决方案2】:

    我可以理解您在使用await 运行并行任务时遇到的问题,因为它的目的是“......暂停方法的执行,直到等待的任务完成”。如果你真的想并行做事,await 可能不是你想要的。

    await 的强大之处在于允许您顺序声明将顺序执行但彼此异步执行的操作,同时确保将操作结果编组回特定线程(当不使用 ConfigureAwait(false) 时)。您可以使用 await ...ConfigureAwait(false),但您正在破坏目的,并且生成的代码可能更慢...

    从您发布的内容来看,您似乎希望并行处理项目的子项目,并按顺序处理项目。例如同时处理第一项的所有子项,完成后,同时处理项二的所有子项,依此类推。如果不正确,您的代码并没有真正反映这一点。

    如果您想要并行启动多个任务,我会避免在单个任务上使用 await,而只使用单个 Task 对象。

    例如:

        public Task Run()
        {
            var items = GetItems();
            var tasks = items.Select(RunItemAsync);
    
            return Task.WhenAll(tasks);
        }
    
        private Task RunItemAsync(Item i)
        {
            var subItems = i.GetSubItems();
    
            var tasks = subItems.Select(s => Task.Factory.StartNew(()=>s.RunSubItem(s)));
    
            return Task.WhenAll(tasks).ContinueWith(_ => ProcessAsync(i), TaskContinuationOptions.ExecuteSynchronously);
        }
    

    但是,很难说你想用你发布的内容做什么。这似乎过于复杂。如果您想要并行执行大量未定义的任务,则生成许多 Tasks 并不是最好的方法。你的 CPUS/Core 数量是有限的,如果你的 CPU 绑定线程比内核多,那么你真的只是让事情变慢(参见context switch)。您可能想要的是一个 queue 任务,这些任务分批处理,最多 x 个任务(其中 x 是 CPU/核心数)。这可以通过Parallel.ForEach 完成。但是,无论哪种情况,您都在谈论与您设计的完全不同的东西。

    仅仅因为await 可用,并不意味着您必须将它用于所有线程场景。

    【讨论】:

      猜你喜欢
      • 2019-12-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多