【问题标题】:How to prioritize tasks generated by await when using async / await使用 async / await 时如何对 await 生成的任务进行优先级排序
【发布时间】:2023-03-27 18:00:01
【问题描述】:

我有大量数据要处理。目前的代码可以简化如下:

public void ProcessData(string data)
{
    string resultOfA = doCpuBoundWorkA(data);

    string resultOfS1 = sendToServiceS1(resultOfA);

    string resultOfB = doCpuBoundWorkB(resultOfS1);

    string resultOfS2 = sendToServiceS2(resultOfB);

    string resultOfC = doCpuBoundWorkC(resultOfS2);
}

使用 Parallel.ForEach 调用 ProcessData。这种实现方式至少从两个角度来看都不是最优的。首先,对服务的调用是阻塞的,所以我们在等待调用返回时阻塞线程。其次,Parallel.ForEach 创建计划在线程池上执行的任务。线程池每 500 毫秒创建额外的线程(如果我没记错的话),并且因为“ProcessData”需要超过 500 毫秒才能完成,随着时间的推移,我们最终会得到数百个线程,这些线程大部分时间都在等待服务卷土重来。

我对“改进”的幼稚想法是这样的:

public async Task ProcessData(string data)
{
    string resultOfA = doCpuBoundWorkA(data);

    string resultOfS1 = await sendToServiceS1Async(resultOfA);

    string resultOfB = doCpuBoundWorkB(resultOfS1);

    string resultOfS2 = await sendToServiceS2Async(resultOfB);

    string resultOfC = doCpuBoundWorkC(resultOfS2);
}

我是 async/await 的新手,所以我对它实际发生的事情的理解可能完全错误。

使用 async/await 关键字,编译器将 ProcessData 的代码分解为多个任务。

  • Task-A:从 ProcessData 方法的开始直到对 ServiceA 的调用“命中”。
  • 任务 B:从我们获取对 ServiceA 的调用结果的那一刻起,一直到对 ServiceB 的调用“成功”为止。
  • Task-C:从我们获取对 ServiceB 的调用结果到 ProcessData 方法结束的那一刻。

因此,我们有三个“工作处理单元”而不是单个“处理单元”,其中每个部分都根据其在调度程序队列中的位置安排执行。

问题是,当 Task-B(第一个工作)被放入调度程序的队列时,我可能有数百个 Task-A,由 Parallel.ForEach 放在那里,到 Task-C 时(对于第一个工作)被放在调度程序的队列中,情况会更糟。

我希望数据尽可能快地通过,因此我需要能够优先考虑 Task-C 而不是 Task-B 而不是 Task-A。实现这一目标的最佳方法是什么?

INotifyCompletion, SynchronizationContext 浮现在脑海中,但它似乎是 async/await 的“黑暗角落”。 ParallelExtensionsExtras 具有带有优先级队列的 ReprioritizableTaskSchedulerQueuedTaskScheduler,但是我如何告诉 async/await 使用所需的调度程序?

John Skeet 在他的博客中谈到了这个问题:https://codeblog.jonskeet.uk/2010/11/02/configuring-waiting/

【问题讨论】:

    标签: c# async-await


    【解决方案1】:

    限制可能是比优先级更容易的方法。

    我认为您的问题最好由TPL Dataflow library 解决。它结合了并行和async 技术。

    您可以创建“块”并将它们“链接”在一起以形成“网格”(在您的情况下,网格是管道)。 TransformBlock 可用于同步和异步操作,还支持内置parallelismthrottling

    或者,您可以使用SemaphoreSlim(在方法开头调用WaitAsync,在结尾调用Release)对ProcessData 方法应用异步限制。但请考虑 TPL 数据流;我发现,如果人们正在做如此复杂的事情,那么他们通常会发现他们也可以在应用程序的其他部分使用 TPL Dataflow。

    【讨论】:

    • 我正在考虑看一下 TPL 数据流,但它不会遇到同样的问题,即对使用 async/await 创建的任务何时执行缺乏控制?我不确定节流是否能解决问题。只要我不破坏内存,就可以创建和处理许多 Task-A。我只想优先考虑“正在运行”的工作。
    • 作为熟悉自定义任务计划程序和同步上下文的人,我强烈建议您考虑限制。这要容易得多。
    • 斯蒂芬,在花了一些时间试图弄清楚如何使用节流之后,我空手而归。我可以为信号量提供没有固定的数字,因为可以运行的 Task-A 的数量取决于 ServiceA 或 ServiceB 返回所需的时间。我要做的是在执行 Task-A 的代码之前检查是否有任何 Task-B 或 Task-C 待处理。不幸的是,我无法知道这一点(这都是编译器的魔法)。是不是在ProcessData 中使用信号量等于把MaxDegreeOfParallelism 放在Paralle.ForEach 中?
    • @Nafas:是的,在这种情况下,SemaphoreSlimMaxDegreeOfParallelism 都在做同样的事情:应用一个简单的限制值。如果这还不够,您可能需要考虑 Reactive Extensions (Rx),它具有更强大的节流机制。当你必须处理时间时,Rx 特别好。
    • 感谢您的帮助。在我看来,在这种情况下使用 async/await 没有任何意义。一方面,我将通过使用async/await 来获取线程,另一方面,我不会使用它们,因为我正在限制一些固定的数字。我会看看如果 Rx 可以在这里提供任何帮助。我怀疑它是否会有所帮助。这不是按固定数量或时间跨度进行节流。核心问题仍然相同:无法控制 await 的继续执行时间。
    【解决方案2】:

    你的问题:

    线程池每 500 毫秒创建额外的线程(如果我没记错的话),并且因为“ProcessData”需要超过 500 毫秒才能完成,随着时间的推移,我们最终会得到数百个线程,这些线程大部分时间都在等待回归的服务。

    可以通过在 ProcessData 上等待“修复”,并且仅在完成后生成新的。 (或执行类似 Task.WhenAll(...Task.Delay(500)..., ...ProcessData()) 之类的操作。

    ProcessData 内部的所有调用都是数据相关的,

    string resultOfA = doCpuBoundWorkA(data);
    string resultOfS1 = await sendToServiceS1Async(resultOfA);
    string resultOfB = doCpuBoundWorkB(resultOfS1);
    string resultOfS2 = await sendToServiceS2Async(resultOfB);
    string resultOfC = doCpuBoundWorkC(resultOfS2);
    

    IIRC,await 仅将执行传递到方法“ProcessData”之外。所以它只能允许其他异步方法运行,但是由于数据依赖,ProcessData内部的调用仍然是背靠背的。

    【讨论】:

      猜你喜欢
      • 2017-04-21
      • 2014-10-20
      • 2020-07-20
      • 2016-09-13
      • 1970-01-01
      • 2021-01-26
      • 1970-01-01
      • 2015-01-20
      • 2018-06-24
      相关资源
      最近更新 更多