【问题标题】:Parallel.ForEach no thread at the endParallel.ForEach 最后没有线程
【发布时间】:2012-05-14 13:04:12
【问题描述】:

我正在测试一个应该编译很多项目/文件的应用程序。

我有一个 ConucrrentBag,应该使用 Parallel 完成。

private readonly ConcurrentBag<string> m_files;

我对并行的呼吁是这样的:

Parallel.ForEach(m_files, new ParallelOptions
            {
                MaxDegreeOfParallelism = MaxProcesses,

            }, currFile => ProcessSingle(currFile.ToString()));

MaxProcess的数量是LogicalCpu*2。

当我编译 140 个项目时,到最后 Parallel 将启动线性较少的线程。至少最后 4 个项目只有一个线程在运行。这不是很好,但没关系。

现在我的问题:

当我编译大约 14000 多个项目时(它是 COBOL-SOURCE ;-) 和一个非常大的系统)最后一个模块不会被编译,因为 Parallel.ForEach 没有为此启动新线程。此时没有有效的工作线程。但是concurrentBag里还有140个项目。

有人知道如何解决这个问题吗?

编辑:只有在我运行编译器时才会出现这个问题。不运行编译器(为了更快的测试)它工作得很好......

编辑:

当我启动 Parallel.ForEach 进程时,ConcurrentBag 已经完全填满。

有关详细信息,请参阅 SingleProcess 中的代码:

private void ProcessSingle(string item)
        {
            Monitor.Enter(lockingObj);
            if (m_files.TryTake(out item))
            {
                if (CompilingModules <= 0)
                {
                    OnQueueStarted(new EventArgs());
                }
                CompilingModules++;
                Monitor.Exit(lockingObj);
                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String));

                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Dequeued, ItemQueueObject.String));
                using (CobolCompiler compiler = new CobolCompiler())
                {
                    compiler.OutputDataReceived += (sender, e) => OnOutputDataReceived(e);
                    compiler.Compile(item);
                    Thread.Sleep(2000);
                    if (compiler.LinkFailure)
                    {
                        if (ObjWithoutDll.ContainsKey(item))
                        {
                            if (ObjWithoutDll[item] <= 2)
                            {
                                m_files.Add(item);
                                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
                                ObjWithoutDll[item]++;
                            }
                            else
                            {
                                OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.LinkError, ItemQueueObject.String));
                                ObjWithoutDll.Remove(item);
                            }
                        }
                        else
                        {
                            ObjWithoutDll.Add(item, 0);
                            m_files.Add(item);
                            OnQueueItemStateChanged(new ItemQueueEventArgs(item, null, ItemQueueType.Enqueued, ItemQueueObject.String));
                        }
                    }
                    else
                    {
                        if (compiler.DllExisting)
                        {
                            ObjWithoutDll.Remove(item);
                        }
                        OnQueueItemStateChanged(compiler.DllExisting ? new ItemQueueEventArgs(item, null, ItemQueueType.Done, ItemQueueObject.String) : new ItemQueueEventArgs(item, null, ItemQueueType.Failed, ItemQueueObject.String));
                    }

                }

                Monitor.Enter(lockingObj);
                CompiledModules++;
                if (CompiledModules % 300 == 0)
                {
                    Thread.Sleep(60000);
                }
                CompilingModules--;
                if (CompilingModules <= 0 && m_files.Count <= 0)
                {

                    try
                    {
                        Process prReschk = new Process();
                        FileInfo batch = new FileInfo(@"batches\reschkdlg.cmd");
                        if (!batch.Exists)
                        {
                            Assembly _assembly = Assembly.GetExecutingAssembly();
                            StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\reschkdlg.cmd"));
                        }

                        if (!File.Exists(Config.Instance.WorkingDir + @"reschkdlg.exe"))
                        {
                            File.Copy(Config.Instance.VersionExeDirectory + @"reschkdlg.exe", Config.Instance.WorkingDir + @"reschkdlg.exe");
                        }

                        prReschk.StartInfo.FileName = @"cmd.exe";
                        prReschk.StartInfo.Arguments = @"/c " + batch.FullName + " " + Config.Instance.Version.Replace(".", "") + " " + @"*" + " " + Config.Instance.WorkingDir;
                        prReschk.StartInfo.CreateNoWindow = true;
                        prReschk.StartInfo.UseShellExecute = false;
                        prReschk.Start();
                        prReschk.Close();
                        prReschk.Dispose();
                    }
                    catch
                    {
                    }

                    OnQueueFinished(new EventArgs());
                }
            }
            Monitor.Exit(lockingObj);
        }

这里是 CobolCompiler 类的 Codesn-p:

public void 编译(字符串文件) {

        file = file.ToLower();

        Process prCompile = new Process();
        Dir = Directory.CreateDirectory(c.WorkingDir + random.Next() + "\\");

        try
        {
            // First clean up the folder
            CleanUpFolder(true, file);

            // First set lock and copy all sources
            Monitor.Enter(lockingObj);
            if (filesToCopy == null)
            {
                CopySource(Dir.FullName);
            }
            Monitor.Exit(lockingObj);

            FileInfo batch = new FileInfo(@"batches\compile.cmd");
            if (!batch.Exists)
            {
                Assembly _assembly = Assembly.GetExecutingAssembly();
                StreamReader _textStreamReader = new StreamReader(_assembly.GetManifestResourceStream(@"Batches\compile.cmd"));
                _textStreamReader.Dispose();
            }

            prCompile.StartInfo.FileName = @"cmd.exe";
            prCompile.StartInfo.Arguments = @"/c " + batch.FullName + " " + c.Version.Replace(".", "") + " " + file.Remove(file.LastIndexOf('.')) + " " + Dir.FullName + " " + Dir.FullName.Remove(Dir.FullName.IndexOf(@"\"));
            prCompile.StartInfo.CreateNoWindow = true;
            prCompile.StartInfo.UseShellExecute = false;
            prCompile.StartInfo.RedirectStandardOutput = true;
            prCompile.StartInfo.RedirectStandardError = true;
            prCompile.StartInfo.WorkingDirectory = Assembly.GetExecutingAssembly().Location.Remove(Assembly.GetExecutingAssembly().Location.LastIndexOf("\\") + 1);
            prCompile.EnableRaisingEvents = true;
            prCompile.OutputDataReceived += prCompile_OutputDataReceived;
            prCompile.ErrorDataReceived += prCompile_OutputDataReceived;
            prCompile.Start();
            prCompile.BeginErrorReadLine();
            prCompile.BeginOutputReadLine();
            prCompile.WaitForExit();
            prCompile.Close();
            prCompile.Dispose();

            CleanUpFolder(false, file);

            if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".dll") || File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".exe"))
            {
                dllExisting = true;
                linkFailure = false;
            }
            else
            {
                if (File.Exists(Config.Instance.WorkingDir + file.Remove(file.LastIndexOf('.')) + ".obj"))
                {
                    linkFailure = true;
                }
                dllExisting = false;
            }



        }
        catch (ThreadAbortException)
        {
            if (prCompile != null)
            {
                // On Error kill process
                prCompile.Kill();
                prCompile.Dispose();
            }
        }
        catch (Win32Exception)
        {
        }
        catch (Exception)
        {
            dllExisting = false;
        }

        while (true)
        {
            try
            {
                if (Directory.Exists(Dir.FullName))
                {
                    Directory.Delete(Dir.FullName, true);
                    break;
                }
                else
                {
                    break;
                }
            }
            catch
            {
            }
        }


    }
private void CopySource(string Destination)
{
    filesToCopy = new StringCollection();
    foreach (string strFile in Directory.GetFiles(c.WorkingDir))
    {
        string tmpStrFile = strFile.ToLower();

        foreach (string Extension in c.Extensions)
        {
            if (tmpStrFile.Contains(Extension))
            {
                filesToCopy.Add(tmpStrFile);
            }
        }
    }

    if (filesToCopy.Count > 0)
    {
        foreach (string strFile in filesToCopy)
        {
            File.Copy(strFile, Destination + strFile.Remove(0, strFile.LastIndexOf("\\")));
        }
    }
}

private void CleanUpFolder(bool PreCleanup, string Filename)
{
    //Copy all files from compilationfolder to working directory
    if (!PreCleanup)
    {
        foreach (string strFile in Directory.GetFiles(Dir.FullName, Filename.Remove(Filename.LastIndexOf(".") + 1) + "*"))
        {
            FileInfo fileToMove = new FileInfo(strFile);

            if (fileToMove.Name.ToLower().Contains(Filename.Remove(Filename.LastIndexOf("."))))
            {
                File.Copy(strFile, c.WorkingDir + fileToMove.Name, true);
            }
        }
    }

    //Delete useless files
    foreach (string filename in Directory.GetFiles(Config.Instance.WorkingDir, Filename.Remove(Filename.LastIndexOf("."))+".*"))
    {
        bool foundExt = c.Extensions.Contains(filename.Remove(0, filename.LastIndexOf(".") + 1));
        if (PreCleanup)
        {
            // Only delete files, which are not won't be compiled
            if(!foundExt)
            {
                File.Delete(filename);
            }
        }
        else
        {
            if (!Config.Instance.SaveLspFile && filename.Contains(".lsp"))
            {
                File.Delete(filename);
            }

            if (!Config.Instance.SaveLstFile && filename.Contains(".lst"))
            {
                File.Delete(filename);
            }
        }
    }
}

public void Dispose()
{
    Dispose(true);
    GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
    if (!disposed)
    {
        if (disposing)
        {
            Dir = null;
        }
        disposed = true;
    }
}

~CobolCompiler()
{
    Dispose (false);
}

我只是在每次编译过程后休眠两秒钟。但这不会改变任何事情。

在编译过程中,CPU 为 100%。该应用程序正在收集 270 MB RAM。开始时只有 35MB。

别害怕,我必须将所有源代码复制到临时文件夹,因为编译器无法在同一个工作目录中同时编译多个文件。

编辑: 我已经解决了没有线程但仍然有项目的问题。

在 ProcessSingle 中,我添加了我尝试再次编译的项目,但它没有链接到 dll。

所以我从 14000 个项目开始,并在处理 Parallel.ForEach 时再次将项目(如果它们未能链接)添加到此 concurrentBag。所以我以 14000 次 ForEach 运行结束,并且有 xxx 模块必须再次编译。 :-(

我没看到。没有 WaitForExit 的 prReschk 的运行是有意的。因为检查超过 14000 个项目的资源需要很长时间,因此不应妨碍新的编译。

但是ConcurrentBag末尾线程少的问题依然存在:(但是只有在循环量大的时候才注意到。

【问题讨论】:

  • ProcessSingle() 是否会抛出您未处理的异常并中止整个运行?
  • ConcurrentBag 是否在 Parallel.ForEach() 之前完全填满?如果是,那么我们是普通的List&lt;&gt;。如果不是,那么这可能是错误的处理方式。
  • 无异常,都是在启动ProcessSingle()之前添加的...
  • 提示:将Monitor.Enter(lockingObj); ... Monitor.Exit(lockingObj); 替换为lock(lockingObj) { ... }。更容易阅读,更少担心。
  • 提示2:将ConcurrentBag&lt;&gt; 替换为List&lt;&gt;

标签: c# multithreading parallel.foreach


【解决方案1】:

Parallel.ForEach 方法将使用 .Net ThreadPool 来分配线程。并行运行的实际线程数将由 ThreadPool 控制,具体取决于系统 CPU 的负载。因此,您可能指定了 MaxDegreeOfParallelism,但这只是最大值,ThreadPool 可能会决定分配比该最大值更少的线程。

根据您在问题中提供的证据,在我看来,编译过程正在耗尽系统资源,而不是事后清理。这可以解释为什么 140 次编译最终导致分配的线程数量逐渐减少 - ThreadPool 没有分配新线程,因为它认为 CPU 负载很重。

我会更仔细地研究编译过程是如何终止的。 ProcessSingle 方法是否在编译完全完成之前返回?编译过程中是否存在内存泄漏?

作为一个实验,如果您在调用 ProcessSingle 后添加以下行,我很想知道它的行为是否有所不同:

 System.Threading.Thread.Sleep(2000);

这将暂停线程两秒钟,然后将控制权传回 ThreadPool 以分配下一个任务。如果它改善了您的应用程序的行为,那么它强烈表明我的理论是正确的。

【讨论】:

  • 我试过了..没有变化:(首先一切都很好......但到最后它最大限度地减少了线程数......当我编译大约 14000+ 时,它将只有 1 个线程留在最后 160 个,140 个后没有线程......:/ 当我编译 140 个模块时,它会删除最后 6 个模块上的线程......真的很奇怪......
  • 半神,感谢您发布其余代码,它更清楚地说明了您的情况。我没有可用的 COBOL 编译环境,因此无法测试您的代码,但通过阅读它,我注意到两个可能解释该行为的问题:
  • 1. prReschk.Start(); - 这之后应该是一个 WaitForExit 调用。我会仔细检查您的所有其他 Process.Start 调用,以确保您等待它们退出。 2.您似乎对读取 ConcurrentBag 的内容的机制感到困惑。 Parallel.ForEach 方法将遍历集合并将其中的每个成员传递给 Lambda 表达式。但是在 ProcessSingle 中,您首先使用“m_files.TryTake(out item)”从 ConcurrentBag 中获取另一个成员。删除这个 TryTake,这几乎肯定会给您带来一些问题,它可以解释您遇到的问题。
  • 如果这两个想法都不能解决问题,那么我将使用 Visual Studio 并行任务窗口来监控并行任务的行为。见msdn.microsoft.com/en-us/magazine/ee410778.aspx
【解决方案2】:

如果CopySource 抛出,那么你有一个未释放的锁lockingObj 并且无法取得进一步的进展。使用lock (lockingObj),它利用finally 块来释放锁。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多