【问题标题】:Should I use a ConcurrentQueue this way or individual threads我应该以这种方式使用 ConcurrentQueue 还是使用单个线程
【发布时间】:2014-07-22 14:02:23
【问题描述】:

我正在做相当于美化邮件合并,然后将文件转换为 PDF...基于 .Net 4.5,我看到了几种可以进行线程处理的方法。使用线程安全队列的那个似乎很有趣(计划 A),但我可以看到一个潜在的问题。你怎么看?我会尽量保持简短,但要填写需要的内容。

这是假设进行数据库处理比 PDF 转换花费更多的时间。

在这两种情况下,每个文件的数据库处理都在其自己的线程/任务中完成,但 PDF 转换可以在许多单线程/任务中完成(计划 B),也可以在单个长时间运行的线程中完成(计划A)。这是我想知道的PDF转换。这一切都在 try/catch 语句中,但该线程不得失败或全部失败(计划 A)。你认为这是个好主意吗?任何建议将不胜感激。

/* A class to process a file: */ 
public class c_FileToConvert
{
    public string InFileName { get; set; }
    public int FileProcessingState { get; set; }
    public string ErrorMessage { get; set; }
    public List<string> listData = null;
    c_FileToConvert(string inFileName)
    {
        InFileName = inFileName;
        FileProcessingState = 0;
        ErrorMessage = ""; // yah, yah, yah - String.Empty
        listData = new List<string>();
    }   
    public void doDbProcessing()
    {
        // get the data from database and put strings in this.listData
        DAL.getDataForFile(this.InFileName, this.ErrorMessage); // static function
        if(this.ErrorMessage != "")
            this.FileProcessingState = -1; //fatal error
        else // Open file and append strings to it
        {  
            foreach(string s in this.listData}
                ...
            FileProcessingState = 1; // enum DB_WORK_COMPLETE ...
         }
    }   
    public void doPDFProcessing()
    {
        PDFConverter cPDFConverter = new PDFConverter();
        cPDFConverter.convertToPDF(InFileName, InFileName + ".PDF");
        FileProcessingState = 2; // enum PDF_WORK_COMPLETE ...
    }       
}

/*** These only for Plan A ***/
public ConcurrentQueue<c_FileToConvert> ConncurrentQueueFiles = new ConcurrentQueue<c_FileToConvert>(); 
public bool bProcessPDFs;   

public void doProcessing() // This is the main thread of the Windows Service 
{
    List<c_FileToConvert> listcFileToConvert = new List<c_FileToConvert>();

    /*** Only for Plan A ***/
    bProcessPDFs = true;
    Task task1 = new Task(new Action(startProcessingPDFs)); // Start it and forget it
    task1.Start();

    while(1 == 1)
    {
        List<string> listFileNamesToProcess = new List<string>();
        DAL.getFileNamesToProcessFromDb(listFileNamesToProcess);

        foreach(string s in listFileNamesToProcess)
        {
            c_FileToConvert cFileToConvert = new c_FileToConvert(s);
            listcFileToConvert.Add(cFileToConvert);
        }       

        foreach(c_FileToConvert c in listcFileToConvert)
            if(c.FileProcessingState == 0)
                Thread t = new Thread(new ParameterizedThreadStart(c.doDbProcessing));

        /** This is Plan A - throw it on single long running PDF processing thread **/
        foreach(c_FileToConvert c in listcFileToConvert)
            if(c.FileProcessingState == 1)
                ConncurrentQueueFiles.Enqueue(c);

        /*** This is Plan B - traditional thread for each file conversion ***/              
        foreach(c_FileToConvert c in listcFileToConvert)
            if(c.FileProcessingState == 1)
                Thread t = new Thread(new ParameterizedThreadStart(c.doPDFProcessing));

        int iCount = 0;
        for(int iCount = 0; iCount < c_FileToConvert.Count; iCount++;)
        {
            if((c.FileProcessingState == -1) || (c.FileProcessingState == 2))
            {
                DAL.updateProcessingState(c.FileProcessingState)
                listcFileToConvert.RemoveAt(iCount);
            }
        }
        sleep(1000);
    }
}   
public void startProcessingPDFs() /*** Only for Plan A ***/
{
    while (bProcessPDFs == true)
    {
        if (ConncurrentQueueFiles.IsEmpty == false)
        {
            try
            {
            c_FileToConvert cFileToConvert = null;
            if (ConncurrentQueueFiles.TryDequeue(out cFileToConvert) == true)
                cFileToConvert.doPDFProcessing();
            }
            catch(Exception e)
            {
                cFileToConvert.FileProcessingState = -1;
                cFileToConvert.ErrorMessage = e.message;
            }
        }
    }
}

Plan A 似乎是一个不错的解决方案,但如果 Task 以某种方式失败怎么办?是的,PDF 转换可以用单独的线程来完成,但我想保留它们用于数据库处理。

这是在文本编辑器中编写的,是我能写的最简单的代码,所以可能有些东西,但我想我明白了。

【问题讨论】:

  • PDF转换CPU或IO是否受限?
  • 它应该是 CPU 绑定的。每个文档大约需要一秒钟。我的假设是我的瓶颈将在数据库中
  • 您需要知道哪些资源限制了性能。可以很好地利用所有 CPU 资源(内核)的并发方法不适用于 IO(例如,您最终只会在线程之间分配相同的 IO 总量:每次转换需要更长的时间,总吞吐量几乎没有变化) .
  • 好的,澄清一下,我确信瓶颈将是数据库。测试表明,PDF 文件转换通常需要不到一秒的最大值。数据库访问至少运行大约 20 秒,并且预计会更高。真的,我的问题是关于使用 ConcurrentQueue 来提供单个线程或使用单个线程。谢谢,M
  • 不要像那样直接使用ConcurrentQueue,使用BlockingCollection,它默认使用ConcurrentQueue作为内部存储。它使您的 ConncurrentQueueFiles.TryDequeue 循环更加高效,因为当队列中没有任何内容要处理时,它们将阻塞而不是旋转。

标签: c# multithreading concurrent-queue


【解决方案1】:

您正在处理多少个文件? 10? 100,000?如果数量很大,使用 1 个线程为每个文件运行数据库查询不是一个好主意。

线程是一个非常低级的控制流结构,我建议您尽量避免在应用程序代码中出现大量混乱而详细的线程生成、连接、同步等。如果可以的话,尽量保持简单。

如何:将每个文件所需的数据放入线程安全队列中。为结果创建另一个线程安全队列。产生一些线程,这些线程重复地从输入队列中拉出项目,运行查询,转换为 PDF,然后将输出推入输出队列。线程应该只共享输入和输出队列。

您可以选择任意数量的工作线程,或者尝试看看什么是好的数量。不要为每个文件创建 1 个线程 - 只需选择一个允许良好 CPU 和磁盘利用率的数字。

或者,如果您的语言/库有并行映射运算符,请使用它。它将为您省去很多麻烦。

【讨论】:

  • 我计划让 Db 查询拥有自己的线程,这就是为什么我只想使用一个线程进行 PDF 转换
  • 如果你愿意,你可以这样做,但我要重申:每个文件一个线程不是一个好主意。最好选择一些允许良好资源利用的线程,并使用队列在线程之间进行负载平衡。更好的是,如果您可以使用类似并行映射或 map-reduce 实用程序的东西,那么您根本不必处理生成线程的细节。
猜你喜欢
  • 2014-03-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多