【问题标题】:Implementing a simple thread pool实现一个简单的线程池
【发布时间】:2012-03-23 18:07:24
【问题描述】:

我目前需要一个简单高效的线程池实现。我在这里和谷歌上搜索过,发现了许多有趣的链接,但到目前为止我发现的似乎都没有合适的。我在网上找到的大多数实现要么太复杂,要么缺少我需要的一些关键功能。

另外我不想使用我不理解的代码,所以我决定自己编写代码(有时重新发明轮子有助于我在知识和经验方面推动自己前进)。我当然了解线程池背后的基本思想,但一些实现细节对我来说还是有些不清楚。这可能是因为我需要的那种线程池有点特殊。让我描述一下。我有一项在特定(大)缓冲区上完成数十万次的任务。我已经测量过,如果我为这个任务使用线程,性能会好得多——缓冲区被分成子缓冲区,每个线程在子缓冲区上执行它的任务并返回结果。然后将所有线程的所有结果加在一起,给我最终的解决方案。

但是,由于经常这样做,我正在失去宝贵的时间,因为创建了这么多线程(因为线程创建带来的开销)。所以我想要一个线程池来执行这个任务,而不是每次都创建一组新的线程。

更清楚地说,这是我目前所拥有的:

  • 将缓冲区分成N个大小相同的子缓冲区
  • 为每个子缓冲区创建一个线程并在子缓冲区上运行它
  • 等待所有线程完成 (WaitForMultipleObjects),将结果相加并销毁线程
  • 重复

我想要实现的是:

  • 将缓冲区分成N个大小相同的子缓冲区
  • 将每个子缓冲区分配给线程池中的一个线程(恰好有 N 个线程)
  • 一旦线程完成,让它休眠,直到另一个任务准备好
  • 当所有线程都完成(并休眠)后,将它们产生的结果相加
  • 通过唤醒线程并为其分配新任务来重复

如您所见,这是一个特殊的线程池,因为我需要等待线程完成。基本上我想摆脱创建线程的开销,因为程序经历了数十万次迭代,因此它可以在其生命周期内创建和销毁数百万个线程。好消息是我根本不需要线程之间的任何同步,它们都有自己的数据和存储位置来获得结果。但是我必须等到所有线程都完成并且我有最终的解决方案,因为下一个任务取决于上一个任务的结果。

我的主要问题是线程管理:

  • 如何让我的线程“休眠”并在新任务准备好后唤醒它们?
  • 如何等待所有线程完成?

如果有任何帮助,我将不胜感激。如果我不够清楚,也可以随时提问。谢谢!

【问题讨论】:

  • 我看不出它有什么特别之处,阻塞线程完成是完全正常的。操作系统实现的线程池可以正常工作。使用 QueueUserWorkItem()、CreateEvent 和 SetEvent 进行同步。
  • @HansPassant:嗨,谢谢你的建议。您能否将其发布为包含更多详细信息的答案?你说的功能我不熟悉。
  • 与 WaitForMultipleObjects() 相同的地方,使用 MSDN 库。

标签: c++ windows multithreading threadpool


【解决方案1】:

如何让我的线程“休眠”并在新任务完成后唤醒它们 准备好了吗?

您可以使用互斥锁或信号量,具体取决于您的情况(在某些情况下,您可能需要使用条件变量或自动/手动重置事件)来使线程等待彼此或唤醒当事情发生时。

如何等待所有线程完成?

您必须在每个线程上使用join() 来等待它完成。因此,如果您有一组线程,您可能希望在每个仍在运行的线程上执行并调用 join。

另外说明:线程池确实已经存在,您可以只使用 Boost.Threadpool 之类的东西,而不是重新发明轮子。

【讨论】:

  • 请问您为什么更喜欢互斥锁和信号量而不是条件变量?我想我在某处看到过,但我不确定这是否是好的做法。
  • @J.N.我从未见过或读过任何互斥锁或信号量是坏主意的东西。
  • 当用作互斥体或信号量时,我也不是。另一方面,互斥量和信号量并不是为了阻止线程等待其他人完成任务,它们是为了共享对资源的访问。
  • @J.N.是的,你有一个正确的观点,我想自动或手动重置事件可能更适合线程等待
  • @J.N. “另一方面,互斥量和信号量并不是为了阻止线程等待其他人完成任务,它们是为了共享对资源的访问”——任务队列是一种资源,可以通过互斥量来防止多次访问.队列中的任务以及它们的数量是一个计数资源,即。只是信号量应该管理的事情。这是使用不恰当的事件 - 事件没有计数。
【解决方案2】:

'正如你所见,这是一个特殊的线程池,因为我需要等待线程完成。' - 不完全的。您希望处理作业中最后一个任务的线程提供作业完成通知。完成通知是 theadPool 的正常功能,否则发起线程将无法处理完整的结果集。池通常同时处理多个任务/任务层次结构,因此完成通知方法应该与线程无关 - 没有 join() 或任何此类东西。此外,没有 WaitForMultipleObject() - 使用难以管理且限制为 64 个对象的同步对象数组。

线程池通常有一个线程池在生产者-消费者队列中等待任务。这些任务通常继承自一些提供线程池服务的“Ctask”类。完成检测和通知机制就是其中之一。

生产者-消费者队列本质上是一个“普通”队列类,可以防止由互斥体进行多次访问,并带有用于计算队列中的任务和等待线程的信号量。池线程每个都通过这个队列,它们永远循环,等待队列信号量,然后从锁定的队列中弹出任务,并调用接收到的任务的 run() 方法。

每个任务在提交到池时都会将线程池作为数据成员加载。这允许任务在需要时提交更多任务。

通常通过调用作为任务成员的事件方法在某处通知每个任务的完成,该事件方法在任务提交到池之前由原始线程加载。

一个任务还应该有一个子任务原子倒计时整数和一个等待其他任务完成的事件。

这在您的示例中如何工作?您可以有一个“主要”任务来提交数组处理任务并等待它们全部完成。

池中的线程数应该多于内核数。我建议加倍。

需要拆分数组,以便为​​每个部分使用单独的任务。多少任务 - 足以使可用内核全部用完,但又不会太多以至于产生过多的上下文切换。对于任何合理大小的数组,假设 64 个任务是一个合理的拆分 - 比可用处理器的典型数量多。此外,任务不应按顺序拆分以避免错误共享。

所以,这个“主要”数组处理任务。使用数组引用加载它并将其完成事件设置为指向某个发出事件信号的方法。将任务提交到池中,等待事件。

任务被加载到一个线程上。它的 run() 方法使用两个循环并创建 32 个数组处理任务,每个任务都有自己的起始索引和数组长度,但起始索引不连续。该任务使用自己继承的 submit() 方法将 32 个新任务中的每一个加载到池中。除了实际将任务排队以在线程上执行外,此 submit() 还原子增量完成计数整数,并将任务的完成事件设置为在任务排队之前的私有完成事件。私有完成事件原子递减完成计数并在为零时发出事件信号。提交所有 32 个数组处理事件后,主任务等待私有完成事件。

因此,32 个数组处理任务在线程上运行。当每个完成时,运行它的线程调用其完成事件,该事件减少主任务中的完成计数整数。最终,最后一个数组处理任务完成,完成计数整数减为零,因此发出运行主任务的线程正在等待的事件。主任务调用自己的完成事件,因此发出主任务发起者正在等待的事件。

主任务发起者在阵列完全处理后继续运行。

..或者,您可以按照其他人的建议使用已经工作的 threadPool 类。

【讨论】:

    【解决方案3】:

    对我来说,与线程通信的首选方式是通过条件变量。因为您可以定义所需的条件并在它发生变化时发出信号。在您的情况下,您可以将它与传递子缓冲区的队列组合,这样每个线程都会在队列为空时等待。 然后可以将结果放在另一个队列中,其中管理队列正在等待,直到所有线程都将结果发布到队列中(对该队列的引用作为请求与子缓冲区一起传递) .

    【讨论】:

      【解决方案4】:

      您是否查看过其他线程池实现?比如http://threadpool.sourceforge.net/。你想要完成的并不是全新的。使线程等待新任务的一种方法是阻塞互斥体,并在另一个任务准备好时解除阻塞该互斥体。您还可以使用从线程返回父线程的某种通知让线程通知它们已完成。

      在我的工作中,我一直在大量使用线程池/线程,并且一直在使用 ØMQ 进行跨线程通信,这允许线程在准备好进行新工作时阻塞来自 ØMQ 的 read() 请求。

      通过一些研究并花费一点时间和精力,您应该能够弄清楚如何构建或利用现有框架/工具来构建您需要的东西。然后,当您遇到一些代码问题时,您可以回到 SO。

      【讨论】:

        猜你喜欢
        • 2012-10-24
        • 1970-01-01
        • 1970-01-01
        • 2011-11-28
        • 1970-01-01
        • 1970-01-01
        • 2012-09-03
        • 2011-05-09
        • 1970-01-01
        相关资源
        最近更新 更多