【问题标题】:What does MaxDegreeOfParallelism do?MaxDegreeOfParallelism 有什么作用?
【发布时间】:2012-03-21 06:15:13
【问题描述】:

我正在使用 Parallel.ForEach 并且我正在做一些数据库更新,现在没有设置 MaxDegreeOfParallelism ,双核处理器机器会导致 sql 客户端超时,而其他四核处理器机器不知何故不会超时。

现在我无法控制我的代码运行时可用的处理器内核类型,但是我可以使用 MaxDegreeOfParallelism 更改一些设置,这些设置可能会同时运行更少的操作并且不会导致超时?

我可以增加超时时间,但这不是一个好的解决方案,如果在较低的 CPU 上我可以同时处理更少的操作,那将减少 cpu 的负载。

好的,我也阅读了所有其他帖子和 MSDN,但是将 MaxDegreeOfParallelism 设置为较低的值会使我的四核机器受到影响吗?

例如,如果 CPU 有两个内核,则使用 20,如果 CPU 有四个内核,则使用 40?

【问题讨论】:

    标签: c# .net-4.0 task-parallel-library parallel-extensions parallel.foreach


    【解决方案1】:

    答案是,它是整个并行运行的上限,与核数无关。

    因此,即使您因为等待 IO 或锁而没有使用 CPU,也不会并行运行额外的任务,只有您指定的最大值。

    为了解决这个问题,我编写了这段测试代码。那里有一个人工锁来刺激 TPL 使用更多线程。当您的代码等待 IO 或数据库时,也会发生同样的情况。

    class Program
    {
        static void Main(string[] args)
        {
            var locker = new Object();
            int count = 0;
            Parallel.For
                (0
                 , 1000
                 , new ParallelOptions { MaxDegreeOfParallelism = 2 }
                 , (i) =>
                       {
                           Interlocked.Increment(ref count);
                           lock (locker)
                           {
                               Console.WriteLine("Number of active threads:" + count);
                               Thread.Sleep(10);
                            }
                            Interlocked.Decrement(ref count);
                        }
                );
        }
    }
    

    如果我没有指定 MaxDegreeOfParallelism,控制台日志会显示多达 8 个任务同时运行。像这样:

    Number of active threads:6
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:6
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    Number of active threads:7
    

    它开始较低,随着时间的推移而增加,最后它试图同时运行 8 个。

    如果我将其限制为某个任意值(比如 2),我会得到

    Number of active threads:2
    Number of active threads:1
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    Number of active threads:2
    

    哦,这是在四核机器上。

    【讨论】:

    • 我的逻辑没有任何等待或任何 IO,它只是更新 SQL,是的,SQL 可能有它自己的,但主要是我在等待 SQL 完成。默认使用的最大活动线程数是多少?
    • 默认值为每个内核 2 个,但如果您的代码未使用 CPU,TPL 会引发此问题。大多数数据库都涉及一定数量的 IO。
    • 如果我的 6 核机器负载很重,它只使用 1 或 2 个线程。如果它的负载很轻,它会上升到 12。它的智能足以将现有系统负载考虑在内。
    • 只有在不涉及 IO 或您正在执行 CPU 密集型工作时才应使用 TPL
    【解决方案2】:

    例如,如果 CPU 有两个内核,则使用 20,如果 CPU 有四个内核,则使用 40?

    您可以这样做以使并行性取决于 CPU 内核的数量:

    var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
    Parallel.ForEach(sourceCollection, options, sourceItem =>
    {
        // do something
    });
    

    但是,较新的 CPU 倾向于使用超线程来模拟额外的内核。因此,如果您有一个四核处理器,那么Environment.ProcessorCount 可能会将其报告为 8 核。我发现,如果您将并行度设置为考虑模拟内核,那么它​​实际上会减慢 UI 线程等其他线程的速度。

    因此,尽管操作会更快完成,但在此期间应用程序 UI 可能会出现明显延迟。将 `Environment.ProcessorCount' 除以 2 似乎可以实现相同的处理速度,同时仍然保持 CPU 可用于 UI 线程。

    【讨论】:

    • 在我的 3950x 上,即使除以 2 也不足以保持 UI 线程处于活动状态。设置为超过 6 的任何值,并且 UI 在整个操作完成之前不会响应,这很烦人。即使是 6 也可能有点滞后。如果我们能像使用后台线程一样设置线程优先级就好了。
    【解决方案3】:

    听起来您正在并行运行的代码正在死锁,这意味着除非您能找到并解决导致该问题的问题,否则您根本不应该将其并行化。

    【讨论】:

    • -1,问题不在于并行还是不并行,很简单,SQL自己计算,但并行请求太多导致客户端超时,我想运行更少的操作。死锁不是问题,因为四核机器逻辑相同,SQL 运行良好,我不想继续增加超时。
    • 您是否尝试过增加超时并确认它有效?并发问题可能非常微妙,许多事情都可能导致它们看似随机地消失和重新出现。它在具有更多内核的不同机器上工作的事实并不意味着它没有损坏,或者更多的内核是有帮助的。
    • 增加超时确实有帮助。但是不知何故,小型机器上的 CPU 使用率超过 50%,而大型机器上的 CPU 使用率不到 5%,现在我需要找出性能问题,我可以做些什么来更改代码或只需要升级 CPU。
    【解决方案4】:

    其他需要考虑的事情,尤其是对于那些多年后才发现的人来说,取决于您的情况,通常最好将所有数据收集到一个 DataTable 中,然后在每个主要任务结束时使用 SqlBulkCopy。

    例如,我创建了一个处理数百万个文件的进程,当每个文件事务进行数据库查询以插入记录时,我遇到了相同的错误。相反,我将其全部存储在内存中的 DataTable 中,用于我迭代的每个共享,将 DataTable 转储到我的 SQL Server 并在每个单独的共享之间清除它。批量插入需要几秒钟的时间,并且不会同时打开数千个连接。

    编辑: 这是一个快速而肮脏的工作示例 SQLBulkCopy 方法:

    private static void updateDatabase(DataTable targetTable)
        {
            try
            {
                DataSet ds = new DataSet("FileFolderAttribute");
                ds.Tables.Add(targetTable);
                writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(@"Opening SQL connection");
                SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
                sqlConnection.Open();
                SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
                bulkCopy.DestinationTableName = "FileFolderAttribute";
                writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(@"Copying data to SQL Server table");
                foreach (var table in ds.Tables)
                {
                    writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    Console.WriteLine(table.ToString());
                }
                bulkCopy.WriteToServer(ds.Tables[0]);
    
                sqlConnection.Close();
                sqlConnection.Dispose();
                writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(@"Closing SQL connection");
                Console.WriteLine(@"Clearing local DataTable...");
                targetTable.Clear();
                ds.Tables.Remove(targetTable);
                ds.Clear();
                ds.Dispose();
            }
            catch (Exception error)
            {
                errorLogging(error, getCurrentMethod(), logDatabaseFile);
            }
        }
    

    ...并将其转储到数据表中:

    private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
        {
            try
            {
                if (tableToggle)
                {
                    DataRow toInsert = results_1.NewRow();
                    toInsert[0] = ServerHostname;
                    toInsert[1] = RootDirectory;
                    toInsert[2] = RecordType;
                    toInsert[3] = Path;
                    toInsert[4] = PathDirectory;
                    toInsert[5] = PathFileName;
                    toInsert[6] = PathFileExtension;
                    toInsert[7] = SizeBytes;
                    toInsert[8] = SizeMB;
                    toInsert[9] = DateCreated;
                    toInsert[10] = DateModified;
                    toInsert[11] = DateLastAccessed;
                    toInsert[12] = Owner;
                    toInsert[13] = PathLength;
                    toInsert[14] = RecordWriteDateTime;
    
                    results_1.Rows.Add(toInsert);
                }
                else
                {
                    DataRow toInsert = results_2.NewRow();
                    toInsert[0] = ServerHostname;
                    toInsert[1] = RootDirectory;
                    toInsert[2] = RecordType;
                    toInsert[3] = Path;
                    toInsert[4] = PathDirectory;
                    toInsert[5] = PathFileName;
                    toInsert[6] = PathFileExtension;
                    toInsert[7] = SizeBytes;
                    toInsert[8] = SizeMB;
                    toInsert[9] = DateCreated;
                    toInsert[10] = DateModified;
                    toInsert[11] = DateLastAccessed;
                    toInsert[12] = Owner;
                    toInsert[13] = PathLength;
                    toInsert[14] = RecordWriteDateTime;
    
                    results_2.Rows.Add(toInsert);
                }
    
    
            }
            catch (Exception error)
            {
                errorLogging(error, getCurrentMethod(), logFile);
            }
        }
    

    ...这是上下文,循环片段本身:

    private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
        {
            DateTime StartTime = DateTime.Now;
            int directoryCount = 0;
            int fileCount = 0;
            try
            {                
                manageDataTables();
    
                Console.WriteLine(rootDirectory.FullName);
                writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);
    
                applicationsDirectoryCount++;
    
                // REPORT DIRECTORY INFO //
                string directoryOwner = "";
                try
                {
                    directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
                }
                catch (Exception error)
                {
                    //writeToLog("\t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                    writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                    errorLogging(error, getCurrentMethod(), logFile);
                    directoryOwner = "SeparatedUser";
                }
    
                writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
                //writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
                writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
    
                if (rootDirectory.GetDirectories().Length > 0)
                {
                    Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
                    {
                        directoryCount++;
                        Interlocked.Increment(ref threadCount);
                        processTargetDirectory(dir, targetPathRoot);
                    });
    
                }
    
                // REPORT FILE INFO //
                Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
                {
                    applicationsFileCount++;
                    fileCount++;
                    Interlocked.Increment(ref threadCount);
                    processTargetFile(file, targetPathRoot);
                });
    
            }
            catch (Exception error)
            {
                writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                errorLogging(error, getCurrentMethod(), logFile);
            }
            finally
            {
                Interlocked.Decrement(ref threadCount);
            }
    
            DateTime EndTime = DateTime.Now;
            writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
        }
    

    如上所述,这既快又脏,但效果很好。

    对于我遇到大约 2,000,000 条记录时遇到的与内存相关的问题,我必须创建第二个 DataTable 并在 2 个之间交替,在交替之间将记录转储到 SQL 服务器。所以我的 SQL 连接由每 100,000 条记录中的 1 条记录组成。

    我是这样处理的:

    private static void manageDataTables()
        {
            try
            {
                Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
                if (tableToggle)
                {
                    int rowCount = 0;
                    if (results_1.Rows.Count > datatableRecordCountThreshhold)
                    {
                        tableToggle ^= true;
                        writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_1.Rows.Count;
                        logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                        Thread.Sleep(5000);
                        if (results_1.Rows.Count != rowCount)
                        {
                            writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                            rowCount = results_1.Rows.Count;
                            Thread.Sleep(15000);
                        }
                        writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        updateDatabase(results_1);
                        results_1.Clear();
                        writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    }
    
                }
                else
                {
                    int rowCount = 0;
                    if (results_2.Rows.Count > datatableRecordCountThreshhold)
                    {
                        tableToggle ^= true;
                        writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_2.Rows.Count;
                        logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                        Thread.Sleep(5000);
                        if (results_2.Rows.Count != rowCount)
                        {
                            writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                            rowCount = results_2.Rows.Count;
                            Thread.Sleep(15000);
                        }
                        writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        updateDatabase(results_2);
                        results_2.Clear();
                        writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    }
                }
            }
            catch (Exception error)
            {
                errorLogging(error, getCurrentMethod(), logDatabaseFile);
            }
        }
    

    其中“datatableRecordCountThreshhold = 100000”

    【讨论】:

    • “未命名”:我添加了一些工作代码示例。代码可能不是最好的形式(风格,最佳实践),但我正在学习如何动态地做这些事情并且一切正常。如果重要的话,我正在针对 4.6.2 进行编译。
    【解决方案5】:

    它设置并行运行的线程数...

    【讨论】:

    • 但它是否考虑了核心?
    • 基本上你用的是什么数据库?
    • 相同的操作系统,相同的程序,相同的数据(基本上是复制器),但一台是双四核的高端机器,两台是简单的双核机器,相同的程序从其他服务器获取数据并存储数据返回 SQL(大量的 blob 和图像)。
    • 您尝试使用多少个并行线程?您使用的数据库平台是什么?
    • 绝对没有设置并行运行的线程数。在实践中,它可以指定并行运行的最大线程数,但这不是它的约定。它实际上限制的是并发运行的操作的数量,这可能等同于也可能不等同于线程,但这是一个抽象的实现细节。
    猜你喜欢
    • 2022-03-27
    • 1970-01-01
    • 2016-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-10
    • 2011-07-17
    相关资源
    最近更新 更多