【问题标题】:Why is Parallel.ForEach much faster then AsParallel().ForAll() even though MSDN suggests otherwise?为什么 Parallel.ForEach 比 AsParallel().ForAll() 快得多,即使 MSDN 另有建议?
【发布时间】:2014-09-18 08:32:54
【问题描述】:

我一直在做一些调查,看看我们如何创建一个通过树运行的多线程应用程序。

为了找到如何以最佳方式实现这一点,我创建了一个测试应用程序,该应用程序通过我的 C:\ 磁盘运行并打开所有目录。

class Program
{
    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunction(startDirectory);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

    public static void ThisIsARecursiveFunction(String currentDirectory)
    {
        var lastBit = Path.GetFileName(currentDirectory);
        var depth = currentDirectory.Count(t => t == '\\');
        //Console.WriteLine(depth + ": " + currentDirectory);

        try
        {
            var children = Directory.GetDirectories(currentDirectory);

            //Edit this mode to switch what way of parallelization it should use
            int mode = 3;

            switch (mode)
            {
                case 1:
                    foreach (var child in children)
                    {
                        ThisIsARecursiveFunction(child);
                    }
                    break;
                case 2:
                    children.AsParallel().ForAll(t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                case 3:
                    Parallel.ForEach(children, t =>
                    {
                        ThisIsARecursiveFunction(t);
                    });
                    break;
                default:
                    break;
            }

        }
        catch (Exception eee)
        {
            //Exception might occur for directories that can't be accessed.
        }
    }
}

然而,我遇到的是,在模式 3 (Parallel.ForEach) 下运行时,代码在大约 2.5 秒内完成(是的,我有一个 SSD ;))。在没有并行化的情况下运行代码大约需要 8 秒。在模式 2 (AsParalle.ForAll()) 下运行代码需要几乎无限的时间。

在进程资源管理器中签入时,我还遇到了一些奇怪的事实:

Mode1 (No Parallelization):
Cpu:     ~25%
Threads: 3
Time to complete: ~8 seconds

Mode2 (AsParallel().ForAll()):
Cpu:     ~0%
Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.)
Time to complete: 1 second per node so about 3 days???

Mode3 (Parallel.ForEach()):
Cpu:     100%
Threads: At most 29-30
Time to complete: ~2.5 seconds

我觉得特别奇怪的是 Parallel.ForEach 似乎忽略了任何仍在运行的父线程/任务,而 AsParallel().ForAll() 似乎在等待上一个任务完成(这不会很快所有父任务仍在等待其子任务完成)。

我在 MSDN 上读到的内容是:“如果可能,更喜欢 ForAll 而不是 ForEach”

来源:http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx

有人知道为什么会这样吗?

编辑 1:

按照 Matthew Watson 的要求,我首先将树加载到内存中,然后再循环遍历它。现在树的加载是按顺序完成的。

然而结果是一样的。 Unparallelized 和 Parallel.ForEach 现在在大约 0.05 秒内完成整个树,而 AsParallel().ForAll 仍然每秒只走大约 1 步。

代码:

class Program
{
    private static DirWithSubDirs RootDir;

    static void Main(string[] args)
    {
        //var startDirectory = @"C:\The folder\RecursiveFolder";
        var startDirectory = @"C:\";

        Console.WriteLine("Loading file system into memory...");
        RootDir = new DirWithSubDirs(startDirectory);
        Console.WriteLine("Done");


        var w = Stopwatch.StartNew();

        ThisIsARecursiveFunctionInMemory(RootDir);

        Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);

        Console.ReadKey();
    }        

    public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
    {
        var depth = currentDirectory.Path.Count(t => t == '\\');
        Console.WriteLine(depth + ": " + currentDirectory.Path);

        var children = currentDirectory.SubDirs;

        //Edit this mode to switch what way of parallelization it should use
        int mode = 2;

        switch (mode)
        {
            case 1:
                foreach (var child in children)
                {
                    ThisIsARecursiveFunctionInMemory(child);
                }
                break;
            case 2:
                children.AsParallel().ForAll(t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            case 3:
                Parallel.ForEach(children, t =>
                {
                    ThisIsARecursiveFunctionInMemory(t);
                });
                break;
            default:
                break;
        }
    }
}

class DirWithSubDirs
{
    public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    public String Path { get; private set; }

    public DirWithSubDirs(String path)
    {
        this.Path = path;
        try
        {
            SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList();
        }
        catch (Exception eee)
        {
            //Ignore directories that can't be accessed
        }
    }
}

编辑 2:

阅读 Matthew 评论的更新后,我尝试将以下代码添加到程序中:

ThreadPool.SetMinThreads(4000, 16);
ThreadPool.SetMaxThreads(4000, 16);

但这不会改变 AsParallel 的执行方式。前 8 个步骤仍在瞬间执行,然后减速到 1 步/秒。

(额外说明,当我无法通过 Directory.GetDirectories() 周围的 Try Catch 块访问目录时,我忽略了发生的异常)

编辑 3:

另外,我主要感兴趣的是 Parallel.ForEach 和 AsParallel.ForAll 之间的区别,因为对我来说很奇怪,由于某种原因,第二个为它所做的每个递归创建一个线程,而第一个处理所有最多约 30 个线程。 (以及为什么 MSDN 建议使用 AsParallel,即使它创建了这么多线程,超时时间约为 1 秒)

编辑 4:

我发现的另一个奇怪的事情: 当我尝试将线程池上的 MinThreads 设置为 1023 以上时,它似乎忽略了该值并缩小到 8 或 16 左右: ThreadPool.SetMinThreads(1023, 16);

当我使用 1023 时,它会非常快地完成前 1023 个元素,然后又回到我一直经历的缓慢速度。

注意:实际上现在创建了超过 1000 个线程(相比之下,整个 Parallel.ForEach 为 30 个)。

这是否意味着 Parallel.ForEach 在处理任务方面更加智能?

更多信息,当您将值设置为 1023 以上时,此代码会打印两次 8 - 8:(当您将值设置为 1023 或更低时,它会打印正确的值)

        int threadsMin;
        int completionMin;
        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin);

        ThreadPool.SetMinThreads(1023, 16);
        ThreadPool.SetMaxThreads(1023, 16);

        ThreadPool.GetMinThreads(out threadsMin, out completionMin);
        Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin);

编辑 5:

应 Dean 的要求,我创建了另一个案例来手动创建任务:

case 4:
    var taskList = new List<Task>();
    foreach (var todo in children)
    {
        var itemTodo = todo;
        taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo)));
    }
    Task.WaitAll(taskList.ToArray());
    break;

这也与 Parallel.ForEach() 循环一样快。所以我们仍然没有答案为什么 AsParallel().ForAll() 这么慢。

【问题讨论】:

  • 使用ThreadPool.SetMinThreads(4000, 4000);,您将 IO 完成端口线程设置为一个疯狂的数字。改用ThreadPool.SetMinThreads(4000, 16);SetMaxThreads() 相同)
  • 我现在已经这样做了,但不知何故仍然会遇到相同的结果。对于模式 2,我看到每秒在资源监视器中弹出 1 个额外线程。此外,当我启用 Console.WriteLine 时,我看到它以每秒大约 1 级的速度在我的磁盘中移动。模式 1 和 3 仍然在不到 1 秒(内存中)内执行我的整个磁盘(80.173 个元素)
  • 你能使用面向任务的版本吗——意味着你等待递归调用的任务,直到调用完成才开始新任务?这里的问题是,当您只有 4 个 CPU 内核之类的东西时,当这个问题是内存中且受 CPU 限制时,您很快就会将线程数淹没。
  • @Dean,我创建了第 4 个案例来测试您的解决方案,但也发现这与 Parallel.ForEach 一样快。我认为这与内存操作和 CPU 没有太大关系,因为我几乎看不到 CPU 活动。我认为这与 AsParallel().ForAll() 方法中某处的某种超时有关。
  • @Devedse,我认为这仍然会在启动时启动所有线程。我没有完全想到这一点,但我认为你会在 for 循环中放置某种等待,以便循环在递归完成之前不会继续。当它沿着目录树向下移动时,您会获得很多深度,并且可能会获得多个线程,但至少它不会一次性创建所有线程。您还可以使用 Thread.Current.ID 调查线程,也许记录每种模型创建了多少线程。

标签: c# multithreading performance foreach parallel-processing


【解决方案1】:

这个问题很容易调试,当你遇到线程问题时这是一种不寻常的奢侈。您的基本工具是 Debug > Windows > Threads 调试器窗口。向您显示活动线程并让您查看它们的堆栈跟踪。您会很容易地看到,一旦速度变慢,您将有 几十个 处于活动状态的线程全部卡住。他们的堆栈跟踪看起来都一样:

    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes  
    mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes 
    mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes    
    mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes   
    mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes  
    mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes  
    System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll<ConsoleApplication1.DirWithSubDirs,int>(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream<ConsoleApplication1.DirWithSubDirs,int> partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172  C#
// etc..

每当您看到这样的事情时,您应该立即想到消防水管问题。可能是线程中第三常见的错误,排在竞争和死锁之后。

你可以推断出,既然你知道了原因,代码的问题是每个完成的线程都会增加 N 个线程。其中 N 是目录中子目录的平均数。实际上,线程的数量呈指数增长,这总是很糟糕。它只会在 N = 1 时保持控制,这当然不会在典型磁盘上发生。

请注意,就像几乎所有线程问题一样,这种不当行为往往会重复得很差。您机器中的 SSD 往往会将其隐藏起来。您机器中的 RAM 也是如此,该程序很可能会在您第二次运行时快速完成且无故障。由于您现在将从文件系统缓存而不是磁盘读取,因此速度非常快。修补 ThreadPool.SetMinThreads() 也会隐藏它,但它无法修复它。它永远不会解决任何问题,它只会隐藏它们。因为无论发生什么,指数级数总是会压倒设定的最小线程数。您只能希望它在此之前完成对驱动器的迭代。对有大驱动力的用户寄予厚望。

ParallelEnumerable.ForAll() 和 Parallel.ForEach() 之间的区别现在也许也很容易解释了。您可以从堆栈跟踪中看出 ForAll() 做了一些顽皮的事情,RunSynchronously() 方法会阻塞,直到所有线程都完成。阻塞是线程池线程不应该做的事情,它会阻塞线程池并且不允许它为另一个作业安排处理器。并且具有您观察到的效果,线程池很快就会被等待 N 个其他线程完成的线程所淹没。这没有发生,他们在池中等待并且没有被安排,因为已经有很多他们处于活动状态。

这是一个死锁场景,很常见,但线程池管理器有一个解决方法。它监视活动的线程池线程,并在它们没有及时完成时介入。然后它允许一个 extra 线程启动,比 SetMinThreads() 设置的最小值多一个。但是不超过 SetMaxThreads() 设置的最大值,有太多的活动 tp 线程是有风险的,并且可能会触发 OOM。这确实解决了死锁,它完成了 ForAll() 调用之一。但这以非常慢的速度发生,线程池每秒只执行两次。在它赶上之前你会失去耐心。

Parallel.ForEach() 没有这个问题,它不会阻塞,所以不会弄乱池。

似乎是解决方案,但请记住,您的程序仍在对您的机器内存进行喷射,向池中添加更多等待的 tp 线程。这也可能使您的程序崩溃,只是不太可能,因为您有很多内存并且线程池不会使用很多内存来跟踪请求。不过有些程序员accomplish that as well.

解决方案很简单,只是不要使用线程。 有害,只有一个磁盘时没有并发。它确实喜欢被多个线程征用。在主轴驱动器上尤其糟糕,磁头寻道非常非常慢。 SSD 做得更好,但是仍然需要 50 微秒的时间,这是您不想要或不需要的开销。访问磁盘的理想线程数始终是一个

【讨论】:

    【解决方案2】:

    首先要注意的是,您正在尝试并行化一个 IO-bound 操作,这会严重扭曲时序。

    要注意的第二件事是并行任务的性质:您正在递归地降低目录树。如果您创建多个线程来执行此操作,则每个线程很可能同时访问磁盘的不同部分 - 这将导致磁盘读取头到处跳跃并大大减慢速度。

    尝试更改您的测试以创建内存树,并改为使用多个线程访问它。然后,您将能够正确地比较时间,而不会扭曲结果而超出所有用处。

    此外,您可能正在创建大量线程,并且它们(默认情况下)将是线程池线程。当线程数量超过处理器内核数量时,拥有大量线程实际上会减慢速度。

    还请注意,当您超过线程池最小线程数(由ThreadPool.GetMinThreads() 定义)时,线程池管理器会在每个新线程池线程创建之间引入延迟。 (我认为每个新线程大约需要 0.5 秒)。

    另外,如果线程数超过ThreadPool.GetMaxThreads() 返回的值,创建线程将阻塞,直到其他线程之一退出。我认为这很可能会发生。

    您可以通过调用ThreadPool.SetMaxThreads()ThreadPool.SetMinThreads() 来增加这些值来检验这个假设,看看它是否有什么不同。

    (最后,请注意,如果您真的想从 C:\ 递归下降,当它到达受保护的 OS 文件夹时,您几乎肯定会遇到 IO 异常。)

    注意:像这样设置最大/最小线程池线程:

    ThreadPool.SetMinThreads(4000, 16);
    ThreadPool.SetMaxThreads(4000, 16);
    

    跟进

    我已经使用上述设置的线程池线程数尝试了您的测试代码,结果如下(不是在我的整个 C:\ 驱动器上运行,而是在较小的子集上运行):

    • 模式 1 耗时 06.5 秒。
    • 模式 2 耗时 15.7 秒。
    • 模式 3 耗时 16.4 秒。

    这符合我的预期;添加大量线程来执行此操作实际上使其比单线程慢,并且两种并行方法花费的时间大致相同。


    如果其他人想对此进行调查,这里有一些确定性测试代码(OP 的代码不可重现,因为我们不知道他的目录结构)。

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading.Tasks;
    
    namespace Demo
    {
        internal class Program
        {
            private static DirWithSubDirs RootDir;
    
            private static void Main()
            {
                Console.WriteLine("Loading file system into memory...");
                RootDir = new DirWithSubDirs("Root", 4, 4);
                Console.WriteLine("Done");
    
                //ThreadPool.SetMinThreads(4000, 16);
                //ThreadPool.SetMaxThreads(4000, 16);
    
                var w = Stopwatch.StartNew();
                ThisIsARecursiveFunctionInMemory(RootDir);
    
                Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds);
                Console.ReadKey();
            }
    
            public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory)
            {
                var depth = currentDirectory.Path.Count(t => t == '\\');
                Console.WriteLine(depth + ": " + currentDirectory.Path);
    
                var children = currentDirectory.SubDirs;
    
                //Edit this mode to switch what way of parallelization it should use
                int mode = 3;
    
                switch (mode)
                {
                    case 1:
                        foreach (var child in children)
                        {
                            ThisIsARecursiveFunctionInMemory(child);
                        }
                        break;
    
                    case 2:
                        children.AsParallel().ForAll(t =>
                        {
                            ThisIsARecursiveFunctionInMemory(t);
                        });
                        break;
    
                    case 3:
                        Parallel.ForEach(children, t =>
                        {
                            ThisIsARecursiveFunctionInMemory(t);
                        });
                        break;
    
                    default:
                        break;
                }
            }
        }
    
        internal class DirWithSubDirs
        {
            public List<DirWithSubDirs> SubDirs = new List<DirWithSubDirs>();
    
            public String Path { get; private set; }
    
            public DirWithSubDirs(String path, int width, int depth)
            {
                this.Path = path;
    
                if (depth > 0)
                    for (int i = 0; i < width; ++i)
                        SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1));
            }
        }
    }
    

    【讨论】:

    • 是的,文件系统已经针对顺序访问进行了优化——它会更快。
    • 请看我的更新,我已将树更改为先加载到内存中,但结果仍然相同。
    • 非常描述性的分析+1
    • 我的主要问题是为什么 Parallel.ForEach 似乎能够在 0.03 秒内完成此代码,而 AsParallel().ForAll() 需要几天时间(+ 必须为每个递归级别创建一个额外的线程)。还有为什么 MSDN 提出相反的建议?
    • 我觉得很奇怪。我已经准确地添加了该代码,但模式 2 仍然需要几乎无限的时间。 (仍然一次只走一步)
    【解决方案3】:

    Parallel.For 和 .ForEach 方法在内部实现等同于在任务中运行迭代,例如像这样的循环:

    Parallel.For(0, N, i => 
    { 
      DoWork(i); 
    });
    

    相当于:

    var tasks = new List<Task>(N); 
    for(int i=0; i<N; i++) 
    { 
    tasks.Add(Task.Factory.StartNew(state => DoWork((int)state), i)); 
    } 
    Task.WaitAll(tasks.ToArray());
    

    从每个迭代潜在地与其他迭代并行运行的角度来看,这是一个不错的心理模型,但在现实中不会发生。事实上,并行并不一定每次迭代都使用一个任务,因为这比必要的开销要大得多。 Parallel.ForEach 尝试使用尽可能快地完成循环所需的最少任务数。当线程变得可用于处理这些任务时,它会启动任务,并且每个任务都参与一个管理方案(我认为它称为分块):一个任务要求完成多个迭代,获取它们,然后处理这些工作,然后返回更多。块大小因参与的任务数量、机器负载等而异。

    PLINQ 的 .AsParallel() 有不同的实现,但它仍然“可以”类似地将多次迭代提取到临时存储中,在线程中进行计算(但不是作为任务),并将查询结果放入一个小缓冲。 (您会根据 ParallelQuery 获得一些东西,然后将进一步的 .Whatever() 函数绑定到一组提供并行实现的替代扩展方法)。

    现在我们对这两种机制的工作原理有了一个小小的了解,我将尝试回答您最初的问题:

    那么为什么 .AsParallel() 比 Parallel.ForEach 慢?原因源于以下几点。任务(或此处的等效实现)NOT 阻塞类似 I/O 的调用。他们“等待”并释放 CPU 来做其他事情。但是(引用 C# 简而言之书):“PLINQ 无法在不阻塞线程的情况下执行 I/O-bound 工作”。这些调用是同步的。编写它们的目的是在(并且仅当)您正在执行诸如为每个任务下载网页而不会占用 CPU 时间的情况下提高并行度。

    您的函数调用与 I/O 绑定调用完全类似的原因是:您的一个线程(称为 T)阻塞并且什么都不做,直到它的所有子线程都已完成,这可能是一个缓慢的过程。 T 在等待子进程解除阻塞时本身并不是 CPU 密集型的,它只是在等待。因此,它与典型的 I/O 绑定函数调用相同。

    【讨论】:

    • 让我补充一点,如果您真的在编写磁盘树解析器,我 100% 同意 Hans Passant 在他的帖子中的观点。虽然从 CPU 的角度来看,它可能看起来是一个 I/O 密集型进程,但磁盘一次只能执行一个请求,并且您将同时处理数十个读取请求。如果您正在执行诸如下载网页之类的操作,那么可以,每个网页都由不同的服务器提供服务。您实际上所做的类似于对磁盘的拒绝服务攻击。
    • 嘿,Dean,你能看看我在 Hans 的回答下留下的评论吗?简而言之,我想知道 OP 仍然可以从 某种程度 的并行性中受益,只要它保持较小,而不会对他的存储进行 DoS 攻击。
    【解决方案4】:

    基于对How exactly does AsParallel work? 的接受回答

    .AsParallel.ForAll() 在调用 .ForAll() 之前转换回 IEnumerable

    所以它创建了 1 个新线程 + N 个递归调用(每个都生成一个新线程)。

    【讨论】:

    • 我觉得奇怪的是 Parallel.ForEach 不这样做,这使它变得更快。 (AsParallel.ForAll 每个节点需要 1 秒,因此如果有 100000 个节点,则需要一天以上的时间才能完成)与 Paralle.ForEach 相比为 0.03 秒
    • 嗯,我猜这是因为 Parallel.ForEach 可以完全控制迭代。也许它只是写得更好?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-01-15
    • 2014-04-21
    • 2015-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多