【问题标题】:Is it ok to use Task.Run in ForEachAsync?在 ForEachAsync 中使用 Task.Run 可以吗?
【发布时间】:2022-02-28 12:15:24
【问题描述】:

我们使用 Nesting await in Parallel.ForEach 的 ForEachAsync 方法,最初是 suggested by  Stephen Toub(在他的博文底部)。

public static async Task ForEachAsync<T>(
        this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body, Action<Task> handleException = null)
    {
        if (source.Any())
        {
            await Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
                select Task.Run(async delegate
                {
                    using (partition)
                        while (partition.MoveNext())
                            await body(partition.Current).ContinueWith(t =>
                            {
                                //observe exceptions
                                if (t.IsFaulted)
                                {
                                    handleException?.Invoke(t);
                                }
                            });
                }));
        }
    }

但我们的一位同事担心在 Stephen Cleary 系列文章中描述的 Task.Run 开销 https://blog.stephencleary.com/2013/11/taskrun-etiquette-examples-even-in.html

当您在 ASP.NET 中将 await 与 Task.Run 一起使用时,就会引入(至少)四个效率问题:
• 额外(不必要的)线程 切换到 Task.Run 线程池线程。同样,当 线程完成请求,它必须进入请求上下文 (这不是实际的线程切换,但确实有开销)。
• 创建了额外的(不必要的)垃圾。异步编程是一种 权衡:以更高的响应速度为代价获得更高的响应能力 内存使用情况。在这种情况下,您最终会为 完全没有必要的异步操作。 • ASP.NET 线程池启发式被Task.Run“意外”抛出 借用线程池线程。我在这里没有很多经验, 但我的直觉告诉我启发式应该恢复得很好 如果意外任务真的很短并且不会处理它 如果意外任务持续超过两秒,则优雅地进行。
• ASP.NET 无法提前终止请求,即,如果 客户端断开连接或请求超时。在同步情况下, ASP.NET 知道请求线程并且可以中止它。在里面 异步情况下,ASP.NET 不知道其他辅助线程池 线程是“为”该请求的。可以通过使用来解决此问题 取消令牌,但这超出了本博文的范围。

我的问题是可以将 Task.Run 用于 ForEachAsync 还是存在更好的方法来以受控的 dop(并行度)并行运行多个异步任务? 例如,我要处理 400 个项目,bun 并行运行不超过 100 个项目。

我们在 .Net 和 .Net Core 环境中都使用 ForEachAsync 方法,所以如果不同环境的答案会有所不同,我很高兴知道两者。

更新以阐明我们正在使用的技术:
我们有 Windows 服务/控制台(用 .Net4.6.1 编写)从 DB 读取数千条记录,然后将它们单独并行(例如,dop=100)发布到 web api 服务(我们考虑分批发送它们,但没有尚未实施)。
我们还有带有后台托管服务的 Asp.Net Core 服务,该服务会定期(例如每 10 秒)提取项目页面(例如最多 400 个),然后并行(例如,dop=100)将它们保存到各个 Azure blob。

更新:在 .NET 6 中考虑 use new API  Parallel.ForEachAsync,这是一种安排异步工作的方法,允许您控制并行度

【问题讨论】:

  • 你真的在使用 ASP.NET 吗?我强烈避免在 ASP.NET 上进行Task.Run 并行处理(包括ForEachAsync 的这种实现)。
  • 如果此代码将在 ASP.NET 上使用,您的同事的担忧是有道理的,调用者可能希望关闭与 degreeOfParallelism: 1 的并行性。此代码将产生 Stephen Cleary 在其博客中列出的所有不必要的开销。
  • @StephenCleary,我们在 Asp.Net Core IHostedService 上使用并行处理,但不在 http 请求上 - 请参阅我对问题的更新

标签: .net multithreading parallel-processing async-await


【解决方案1】:

以异步方式处理 MDOP 为 100 的 400 条消息的简单方法是使用 ActionBlock&lt;T&gt;。这样的事情会起作用:

public class ActionBlockExample
{
    private ActionBlock<int> actionBlock;

    public ActionBlockExample()
    {
        actionBlock = new ActionBlock<int>(x => ProcessMsg(x), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 100
        });
    }

    public async Task Process()
    {
        foreach (var msg in Enumerable.Range(0, 400))
        {
            await actionBlock.SendAsync(msg);
        }
        actionBlock.Complete();
        await actionBlock.Completion;
    }

    private Task ProcessMsg(int msg) => Task.Delay(100);
}

默认情况下ActionBlock 有一个未绑定的输入缓冲区,它将同时处理所有 400 条消息,最多并行处理 100 条。这里不需要Task.Run,因为所有消息都在后台处理。

【讨论】:

  • 谢谢,我已经使用您建议的 ActionBlock 进行了一些测试,并与 ForEachAsync 进行了比较,但结果不是结论性的(有时更快,有时另一种),并且差异不大。
【解决方案2】:

您可能需要考虑使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

public static async Task ForEachAsync<T>(
    this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body)
{
    await source
        .ToObservable()
        .Select(t => Observable.FromAsync(() => body(t)))
        .Merge(degreeOfParallelism)
        .LastAsync();
}

您必须更改错误处理方式,但这是可能的。

【讨论】:

    猜你喜欢
    • 2014-03-07
    • 1970-01-01
    • 1970-01-01
    • 2017-09-07
    • 1970-01-01
    • 2012-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多