【问题标题】:Task.WaitAll() on continuation task only delays execution of original task?继续任务上的 Task.WaitAll() 只会延迟原始任务的执行?
【发布时间】:2014-03-28 07:21:31
【问题描述】:

背景:

我有一个控制台应用程序,它创建 Tasks 来处理来自数据库的数据(我们称它们为 Level1 任务)。每个任务都会再次创建自己的任务来处理分配给它的数据的每一部分(2 级任务)。

每个 Level2 任务都有一个与之关联的延续任务,以及用于在继续之前对延续任务执行 WaitAll 的代码。

我在.NET 4.0(没有async/await

问题:

这产生了一个问题——事实证明,如果以这种方式完成,在所有可用的 Level1 任务都安排好之前,没有任何 Level2 任务被启动。这无论如何都不是最佳的。

问题:

这似乎已通过更改代码以等待原始 Level2 任务及其继续任务来解决。但是,我不完全确定为什么会这样。

你有什么想法吗?

我唯一能想到的是 - 由于延续任务尚未开始,等待它完成是没有意义的。但即使是这样,我也希望至少有一些 Level2 任务已经开始。他们从来没有这样做过。

示例:

我创建了一个示例控制台应用程序来准确展示该行为:

  1. 按原样运行它,您会看到它首先安排所有任务,然后您才开始从 Level2 任务中获取实际写入的行。

  2. 但注释掉标记的代码块并取消注释替换,一切正常。

你能告诉我为什么吗?

public class Program
{
    static void Main(string[] args)
    {
        for (var i = 0; i < 100; i++)
        {
            Task.Factory.StartNew(() => SomeMethod());
            //Thread.Sleep(1000);
        }

        Console.ReadLine();
    }

    private static void SomeMethod()
    {
        var numbers = new List<int>();

        for (var i = 0; i < 10; i++)
        {
            numbers.Add(i);
        }

        var tasks = new List<Task>();

        foreach (var number in numbers)
        {
            Console.WriteLine("Before start task");

            var numberSafe = number;

            /* Code to be replaced START */

            var nextTask = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Got number: {0}", numberSafe);
            })
                .ContinueWith(task =>
                {
                    Console.WriteLine("Continuation {0}", task.Id);
                });

            tasks.Add(nextTask);

            /* Code to be replaced END */

            /* Replacement START */

            //var originalTask = Task.Factory.StartNew(() =>
            //{
            //    Console.WriteLine("Got number: {0}", numberSafe);
            //});

            //var contTask = originalTask
            //    .ContinueWith(task =>
            //    {
            //        Console.WriteLine("Continuation {0}", task.Id);
            //    });

            //tasks.Add(originalTask);
            //tasks.Add(contTask);

            /* Replacement END */
        }

        Task.WaitAll(tasks.ToArray());
    }
}

【问题讨论】:

标签: c# .net multithreading task-parallel-library


【解决方案1】:

我认为您看到了 Task Inlining 行为。引用MSDN:

在某些情况下,当一个Task处于等待状态时,它可能会在执行等待操作的线程上同步执行。这提高了性能,因为它通过利用本来会阻塞的现有线程来防止需要额外的线程,否则。为防止重入错误,任务内联仅在相关线程的本地队列中找到等待目标时才会发生。

你不需要 100 个任务就能看到这个。我已将您的程序修改为具有 4 个 1 级任务(我有四核 CPU)。每个 1 级任务只创建一个 2 级任务。

static void Main(string[] args)
{
    for (var i = 0; i < 4; i++)
    {
        int j = i;
        Task.Factory.StartNew(() => SomeMethod(j)); // j as level number
    }
}

在您的原始程序中,nextTask 是延续任务 - 所以我只是简化了方法。

private static void SomeMethod(int num)
{
    var numbers = new List<int>();

    // create only one level 2 task for representation purpose
    for (var i = 0; i < 1; i++)
    {
        numbers.Add(i);
    }

    var tasks = new List<Task>();

    foreach (var number in numbers)
    {
        Console.WriteLine("Before start task: {0} - thread {1}", num, 
                              Thread.CurrentThread.ManagedThreadId);

        var numberSafe = number;

        var originalTask = Task.Factory.StartNew(() =>
        {
            Console.WriteLine("Got number: {0} - thread {1}", num, 
                                    Thread.CurrentThread.ManagedThreadId);
        });

        var contTask = originalTask
            .ContinueWith(task =>
            {
                Console.WriteLine("Continuation {0} - thread {1}", num, 
                                    Thread.CurrentThread.ManagedThreadId);
            });

        tasks.Add(originalTask); // comment and un-comment this line to see change in behavior

        tasks.Add(contTask); // same as adding nextTask in your original prog.

    }

    Task.WaitAll(tasks.ToArray());
}

这是示例输出 - 关于评论 tasks.Add(originalTask); - 这是您的第一个块。

Before start task: 0 - thread 4
Before start task: 2 - thread 3
Before start task: 3 - thread 6
Before start task: 1 - thread 5
Got number: 0 - thread 7
Continuation 0 - thread 7
Got number: 1 - thread 7
Continuation 1 - thread 7
Got number: 3 - thread 7
Continuation 3 - thread 7
Got number: 2 - thread 4
Continuation 2 - thread 4

还有一些示例输出 - 保留 tasks.Add(originalTask); 这是你的第二个块

Before start task: 0 - thread 4
Before start task: 1 - thread 6
Before start task: 2 - thread 5
Got number: 0 - thread 4
Before start task: 3 - thread 3
Got number: 3 - thread 3
Got number: 1 - thread 6
Got number: 2 - thread 5
Continuation 0 - thread 7
Continuation 1 - thread 7
Continuation 3 - thread 7
Continuation 2 - thread 4

正如您在第二种情况下看到的,当您在启动它的同一线程上等待 originalTask 时,task inlining 将使其在同一线程上运行 - 这就是您之前看到 Got Number.. 消息的原因。

【讨论】:

  • 这很有趣 - 我会仔细阅读并考虑到这一点进行更多测试。
  • 我发现所有的答案都非常有帮助 - 因为你是第一个解释它的人(并且有一个有用的例子),我会将你的答案标记为已接受。谢谢
  • @JoannaTurban:很高兴它有帮助。如果您正在考虑替代方案,您应该看看那里的TPL.DataflowBufferBlock&lt;&gt;。它是异步/非阻塞生产者-消费者数据结构,可用于 async/await。
【解决方案2】:

您的代码的问题是阻塞 Task.WaitAll(tasks.ToArray())。默认的 TPL 任务调度程序不会为您以Factory.StartNew 开始的每个 任务使用新的池线程。然后你启动了 100 个 Level1 任务,每个任务用 Task.WaitAll 阻塞一个线程。

这会造成瓶颈。默认大小为ThreadPool,我得到约20 个线程并发运行,其中只有4 个实际同时执行(CPU 内核的数量)。

因此,某些任务将仅排队并稍后启动,因为较早的任务正在完成。要了解我的意思,请尝试像这样更改您的代码:

static void Main(string[] args)
{
    for (var i = 0; i < 100; i++)
    {
        Task.Factory.StartNew(() => SomeMethod(), 
            TaskCreationOptions.LongRunning);
    }

    Console.ReadLine();
}

TaskCreationOptions.LongRunning 会给你想要的行为,但这当然是一个错误的解决方案。

正确的解决方案是尽可能避免阻塞代码。如果必须全部执行,您应该只在最顶层执行阻塞等待。

为了解决这个问题,您的代码可以重构如下。请注意ContinueWhenAllUnwrap 和(可选)ExecuteSynchronously 的使用,这有助于消除阻塞代码并减少涉及的池线程数。这个版本的性能要好得多。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class Program
{
    static void Main(string[] args)
    {
        var tasks = new List<Task>();

        for (var i = 0; i < 100; i++)
        {
            tasks.Add(Task.Factory.StartNew(() => SomeMethod(i)).Unwrap());
        }

        // blocking at the topmost level
        Task.WaitAll(tasks.ToArray());

        Console.WriteLine("Enter to exit...");
        Console.ReadLine();
    }

    private static Task<Task[]> SomeMethod(int n)
    {
        Console.WriteLine("SomeMethod " + n);

        var numbers = new List<int>();

        for (var i = 0; i < 10; i++)
        {
            numbers.Add(i);
        }

        var tasks = new List<Task>();

        foreach (var number in numbers)
        {
            Console.WriteLine("Before start task " + number);

            var numberSafe = number;

            var nextTask = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Got number: {0}", numberSafe);
            })
            .ContinueWith(task =>
            {
                Console.WriteLine("Continuation {0}", task.Id);
            }, TaskContinuationOptions.ExecuteSynchronously);

            tasks.Add(nextTask);
        }

        return Task.Factory.ContinueWhenAll(tasks.ToArray(), 
            result => result, TaskContinuationOptions.ExecuteSynchronously);
    }
}

理想情况下,在实际项目中,您应该尽可能坚持使用自然异步 API(例如,"Using SqlDataReader’s new async methods in .Net 4.5"),并仅将 Task.Run / Task.Factory.StartNew 用于 CPU 密集型计算任务。而对于服务器端应用程序(例如,ASP.NET Web API),Task.Run / Task.Factory.StartNew 通常只会增加冗余线程切换的开销。它不会加快 HTTP 请求的完成速度,除非您确实需要并行执行多个 CPU 密集型作业,从而损害可伸缩性。

我了解以下可能不是一个可行的选择,但我强烈建议升级到 VS2012+ 并使用async/await 来实现这样的逻辑。这将是非常值得的投资,因为它大大加快了编码过程并生成更简单、更干净且不易出错的代码。您仍然可以使用 Microsoft.Bcl.Async 定位 .NET 4.0。

【讨论】:

  • 感谢您的解释 - 这是一个值得考虑的有趣替代方案,并且可能是解决手头问题的更可靠的解决方案。
【解决方案3】:

如果我没记错的话,等待尚未安排好的任务可能会同步执行。 (请参阅here)在另一种情况下,这种行为将应用于您的代码也就不足为奇了。

请记住,线程行为高度依赖于实现和机器,这里发生的事情可能是这样的:

  • 考虑到调用Task.StartNew 和线程池的任务实际执行之间的延迟,大多数所谓的“1 级”任务(如果不是全部)都安排在第一个任务完成之前实际执行。
  • 由于默认任务调度程序使用 .NET ThreadPool,因此此处调度的所有任务都可能在 ThreadPool 线程上执行。
  • 一旦执行“1 级”任务,调度队列就会全部被“1 级”任务填满。
  • 每次执行“1 级”任务时,它都会根据需要安排尽可能多的“2 级”任务,但这些任务都安排在“1 级”任务之后。
  • 当“1 级”任务到达等待“2 级”任务的所有继续的点时,执行线程进入等待状态。
  • 由于许多 ThreadPool 线程处于等待状态,程序迅速达到 ThreadPool 饥饿,迫使 ThreadPool 分配新线程(总共可能超过 100 个)以解决饥饿问题
  • 一旦最后一个“1 级”任务进入等待状态,线程池至少会分配一个额外的线程。
  • 这个最后分配的额外线程现在可以首次执行“2 级”任务及其后续任务,因为所有“1 级”任务都已完成。
  • 一段时间后,一个“1 级”任务将完成他的所有“2 级”任务。然后,这个“1 级”任务将从等待中唤醒并完成其执行,从而释放另一个 ThreadPool 线程,并加速剩余“2 级”任务的执行和继续。

当您使用替代方法时发生的变化是,因为您直接在要等待的任务数组中引用“级别 2”任务,所以 Task.WaitAll 方法有机会同步执行“级别 2”任务而不是空转。 这在初始情况下不会发生,因为后续任务无法同步运行。

总之,在 ThreadPool 线程中等待是导致线程饥饿和观察到的奇怪行为的原因。虽然等待任务的代码中的优化使线程饥饿行为逐渐消失,但这显然不是您应该依赖的。

为了解决你最初的问题,你最好听从 lil-raz 的建议,放弃你的内在任务。

如果您可以访问 C# 5.0,您还可以考虑使用 async/await 模式来编写代码,而无需依赖等待。

【讨论】:

    【解决方案4】:

    我不得不说这段代码真的很不乐观,因为你创建了 100 个任务,并不意味着你会有 100 个线程,并且在每个任务中你创建了两个新任务,你对调度程序的超标。如果这些任务与数据库读取有关,为什么不将它们标记为长时间处理并丢弃内部任务?

    【讨论】:

    • 这只是一个演示行为的示例。真正的应用程序正在执行数据库调用,但这并不是问题的一部分。它们并不意味着以任何方式长期运行。问题是——为什么代码的变化会以这种方式影响处理。
    猜你喜欢
    • 2015-09-14
    • 2013-02-13
    • 1970-01-01
    • 2013-11-11
    • 2022-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多