【问题标题】:Does Parallel.ForEach limit the number of active threads?Parallel.ForEach 是否限制活动线程的数量?
【发布时间】:2009-07-11 18:13:36
【问题描述】:

鉴于此代码:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

所有 1000 个线程会几乎同时产生吗?

【问题讨论】:

    标签: c# .net c#-4.0 parallel-processing


    【解决方案1】:

    不,它不会启动 1000 个线程 - 是的,它会限制使用的线程数。 Parallel Extensions 使用适当数量的内核,具体取决于您实际拥有的数量已经忙碌的数量。它为每个内核分配工作,然后使用一种称为工作窃取的技术让每个线程有效地处理自己的队列,并且只需要在真正需要时进行任何昂贵的跨线程访问。

    查看PFX Team Blog 以获取有关其如何分配工作和各种其他主题的负载信息。

    请注意,在某些情况下,您也可以指定所需的并行度。

    【讨论】:

    • 我今晚使用 Parallel.ForEach(FilePathArray, path =>... 读取大约 24,000 个文件,为我读入的每个文件创建一个新文件。非常简单的代码。似乎即使是 6 个线程足以压倒我以 100% 利用率读取的 7200 RPM 磁盘。在几个小时内,我看到 Parallel 库分拆了 8,000 多个线程。我使用 MaxDegreeOfParallelism 进行测试,果然 8000 多个线程消失了。我有现在对其进行了多次测试,结果相同。
    • 可以为一些退化的“DoSomething”启动 1000 个线程。 (就像我目前正在处理生产代码中的问题一样,该问题未能设置限制并产生 200 多个线程,从而弹出 SQL 连接池。我建议为任何无法简单推理的工作设置 Max DOP大约是明确的 CPU 绑定。)
    【解决方案2】:

    在单核机器上... Parallel.ForEach 在多个线程之间处理的集合的分区(块),但该数字是根据一种算法计算得出的,该算法考虑到并似乎持续监控工作由分配给 ForEach 的线程完成。所以如果 ForEach 的主体部分调用长时间运行的 IO 绑定/阻塞函数,这会使线程等待,算法将产生更多线程并在它们之间重新划分集合。如果线程快速完成并且不阻塞 IO 线程,例如简单地计算一些数字,算法将增加(或实际上减少)线程数到算法认为吞吐量最佳的点(每次迭代的平均完成时间).

    基本上,所有各种并行库函数背后的线程池都会计算出要使用的最佳线程数。物理处理器内核的数量只是等式的一部分。内核数和产生的线程数之间没有简单的一对一关系。

    我没有发现有关取消和处理同步线程的文档很有帮助。希望MS能在MSDN中提供更好的例子。

    不要忘记,主体代码必须编写为在多个线程上运行,以及所有常见的线程安全考虑,框架并没有抽象出那个因素......但是。

    【讨论】:

    • "..如果 ForEach 的主体部分调用了长时间运行的阻塞函数,这将使线程等待,算法将产生更多线程.." - 在退化的情况下这意味着每个 ThreadPool 可能会创建尽可能多的线程。
    • 你是对的,对于 IO,它可能会在我自己调试时分配 +100 个线程
    【解决方案3】:

    很好的问题。在您的示例中,即使在四核处理器上,并行化水平也相当低,但经过一些等待,并行化水平会变得相当高。

    // Max concurrency: 5
    [Test]
    public void Memory_Operations()
    {
        ConcurrentBag<int> monitor = new ConcurrentBag<int>();
        ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
        var arrayStrings = new string[1000];
        Parallel.ForEach<string>(arrayStrings, someString =>
        {
            monitor.Add(monitor.Count);
            monitor.TryTake(out int result);
            monitorOut.Add(result);
        });
    
        Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
    }
    

    现在看看添加等待操作以模拟 HTTP 请求时会发生什么。

    // Max concurrency: 34
    [Test]
    public void Waiting_Operations()
    {
        ConcurrentBag<int> monitor = new ConcurrentBag<int>();
        ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
        var arrayStrings = new string[1000];
        Parallel.ForEach<string>(arrayStrings, someString =>
        {
            monitor.Add(monitor.Count);
    
            System.Threading.Thread.Sleep(1000);
    
            monitor.TryTake(out int result);
            monitorOut.Add(result);
        });
    
        Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
    }
    

    我还没有进行任何更改,并发/并行化水平已经大幅提升。 ParallelOptions.MaxDegreeOfParallelism 可以增加并发限制。

    // Max concurrency: 43
    [Test]
    public void Test()
    {
        ConcurrentBag<int> monitor = new ConcurrentBag<int>();
        ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
        var arrayStrings = new string[1000];
        var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
        Parallel.ForEach<string>(arrayStrings, options, someString =>
        {
            monitor.Add(monitor.Count);
    
            System.Threading.Thread.Sleep(1000);
    
            monitor.TryTake(out int result);
            monitorOut.Add(result);
        });
    
        Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
    }
    
    // Max concurrency: 391
    [Test]
    public void Test()
    {
        ConcurrentBag<int> monitor = new ConcurrentBag<int>();
        ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
        var arrayStrings = new string[1000];
        var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
        Parallel.ForEach<string>(arrayStrings, options, someString =>
        {
            monitor.Add(monitor.Count);
    
            System.Threading.Thread.Sleep(100000);
    
            monitor.TryTake(out int result);
            monitorOut.Add(result);
        });
    
        Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
    }
    

    我建议设置ParallelOptions.MaxDegreeOfParallelism。它不一定会增加正在使用的线程数,但会确保您只启动合理数量的线程,这似乎是您关心的问题。

    最后回答您的问题,不,您不会立即启动所有线程。如果您希望完美地并行调用,请使用 Parallel.Invoke,例如测试竞争条件。

    // 636462943623363344
    // 636462943623363344
    // 636462943623363344
    // 636462943623363344
    // 636462943623363344
    // 636462943623368346
    // 636462943623368346
    // 636462943623373351
    // 636462943623393364
    // 636462943623393364
    [Test]
    public void Test()
    {
        ConcurrentBag<string> monitor = new ConcurrentBag<string>();
        ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
        var arrayStrings = new string[1000];
        var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
        Parallel.ForEach<string>(arrayStrings, options, someString =>
        {
            monitor.Add(DateTime.UtcNow.Ticks.ToString());
            monitor.TryTake(out string result);
            monitorOut.Add(result);
        });
    
        var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
        Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
    }
    

    【讨论】:

      【解决方案4】:

      它会根据处理器/内核的数量计算出最佳线程数。它们不会同时生成。

      【讨论】:

        【解决方案5】:

        请参阅Does Parallel.For use one Task per iteration? 了解要使用的“心理模型”的想法。然而,作者确实声明“归根结底,重要的是要记住实现细节可能随时更改。”

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2011-07-23
          • 2010-12-19
          • 1970-01-01
          • 2017-08-06
          • 2016-05-18
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多