【问题标题】:How to pause creating new threads until older threads have ended如何暂停创建新线程,直到旧线程结束
【发布时间】:2018-05-02 18:26:09
【问题描述】:

背景

我有一个非常大的二进制数据文件(20+ GB),我需要对其进行解析、处理数据,然后写入我的输出。我几乎没有处理如此大量数据的经验,虽然我在概念化如何处理它时遇到了一些麻烦,但我确实有一个想法。注意:输入数据包含许多从 IBM 大型机中检索到的记录,因此其格式如下:

每条记录(行/行)的前 4 个字节是 RDW(记录描述符字)。 RDW 包含记录的长度(包括 RDW)。由于 RDW,即使文件是一个恒定的字节流,我也知道每条记录的结束位置。我可以将此二进制文件转换为文本文件,将每两个字节转换为其十六进制表示,并在记录末尾包含一个换行符,但恐怕如果 20+ GB 二进制文件有多大翻译成这样。

因为我想将文件保留为二进制文件,所以我知道如何继续:

  1. 使用一个“主”线程按顺序读取文件。
  2. 一旦主服务器到达一条记录的末尾(使用在 RDW 中找到的信息),它就会产生一个新的“工作”线程,将它从文件中读取的数据传递给该线程。
    • 工作线程解析数据,处理数据,并将其输出到某处。 (我想我会将数据存放在 SQLite 数据库中。)
  3. 当工作线程工作时,主线程继续读取文件,当它完成读取另一条记录时,它会产生第二个工作线程来处理第二条记录。这种情况一直持续到处理完所有记录为止。

问题

不幸的是,我设想了一个问题。阅读“主”线程的工作速度将比它产生的线程快得多,我担心会创建太多线程。为了防止这种情况,我设想了这个解决方案(在伪代码中):

record = file.ReadRecord()
if(numberOfRunningWorkerThreads < MAX_THREADS)
    SpawnWorkerThread(record);
else
    WaitUntil(numberOfRunningWorkerThreads < MAX_THREADS)

但是,我不知道如何实现这样的功能,尤其是最后一个else 条件。我是多线程和异步的新手,我什至不确定这两个术语之间的区别是什么。

谁能指出我正确的方向?

【问题讨论】:

  • 为什么不让您的“主”线程将数据放入队列中。有一定数量的工作线程,每个工作线程都从队列中拉出。您必须确保队列是同步的,这样两个工作线程就不会出列相同的数据。

标签: c# multithreading file asynchronous


【解决方案1】:

我相信您正在寻找 Semaphore(或者 SemaphoreSlim 可能也适合您)。信号量“限制可以同时访问资源或资源池的线程数。”信号量是使用特定数量的插槽创建的。然后,您可以调用“WaitOne”来等待可用的插槽,并在使用完插槽后调用“释放”。如果没有可用的插槽,“WaitOne”可以永远等待,或者直到发生超时。

所以在你的情况下,主线程会调用 WaitOne 来等待一个可用的插槽。然后,在工作线程结束时,您可以调用 Release 来释放一个插槽。

.NET 信号量: https://msdn.microsoft.com/en-us/library/system.threading.semaphore(v=vs.110).aspx

.NET SemaphoreSlim(轻量级信号量): https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim(v=vs.110).aspx

【讨论】:

  • 这听起来正是我正在寻找的!非常感谢!在我阅读更多有关 Semaphore 和 SemaphoreSlim 的内容之前,我将推迟将此标记为答案。
  • @JDCAce,这正是您要求的目的,但这并不是解决您问题的最佳方法。创建和销毁线程是有成本的,如果您为文件中的每条记录创建和销毁一个线程,那么该成本可能超过您想要支付的费用。如果您希望不超过 N 个线程同时运行,那么您的程序应该预先创建 N 个线程,然后重新使用它们。这就是 线程池 为您所做的(参见user270576 的回答)。
  • 我对线程池的工作原理不太熟悉,但我的理解是它不适用于长时间运行的任务——这里可能会也可能不会。我的第二个想法是,如果主线程比工作线程快得多,一次可以排队多少个线程。我不确定它可以容纳的队列或资源是否有任何限制。
【解决方案2】:

解决方案 1:

使用ThreadPool。设置MaxThreads,其中

设置可以并发活动的线程池的请求数。在线程池线程可用之前,所有高于该数量的请求都会排队。

类似:

System.Threading.ThreadPool.SetMaxThreads(50, 1000);
// inside loop
ThreadPool.QueueUserWorkItem(ProcessRequest);
// end loop

ProcessRequest 是您的工作方法。

解决方案 2:

如果您知道记录数:使用 Parallel.For 并相应地设置 MaxDegreeOfParallelism

Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
i => { 
    ProcessRequest(i);
});

【讨论】:

  • Parallel.ForEach 也可能是一个选项,不需要知道记录的数量,只要它们可以创建一个逐条读取记录的枚举器。
  • @wiz 没错!
猜你喜欢
  • 1970-01-01
  • 2017-11-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多