【问题标题】:Thread pooling in C++11C++11 中的线程池
【发布时间】:2013-03-23 01:42:49
【问题描述】:

相关问题

关于 C++11:

关于 Boost:


我如何获得一个线程池将任务发送到,而不是一遍又一遍地创建和删除它们?这意味着持久线程无需加入即可重新同步。


我的代码如下所示:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

我宁愿每次迭代都将任务发送到我的工作线程,并且只创建一次,而不是每次迭代都创建和加入线程。

【问题讨论】:

  • here 的相关问题和我的回答。
  • 考虑过使用 tbb(它是 Intel,但免费和开源,并且完全符合您的要求:您只需提交(递归可分)任务而不用担心线程)?
  • 这个 FOSS 项目是我尝试创建一个线程池库,如果你想看看。 -> code.google.com/p/threadpool11
  • 使用tbb有什么问题?

标签: c++ multithreading c++11 threadpool stdthread


【解决方案1】:

这是从我对另一个非常相似的帖子的回答中复制的:

  1. 从系统可以支持的最大线程数开始:

    int num_threads = std::thread::hardware_concurrency();
    
  2. 为了高效的线程池实现,一旦根据num_threads 创建线程,最好不要创建新线程,或者销毁旧线程(通过加入)。会有性能损失,甚至可能使您的应用程序运行速度比串行版本慢。

每个 C++11 线程都应该在其函数中无限循环地运行,不断等待新任务的抓取和运行。

下面是如何将这样的函数附加到线程池:

int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++)
{
    pool.push_back(std::thread(Infinite_loop_function));
}
  1. 无限循环功能。这是一个等待任务队列的while (true) 循环。
void Pool::Infinite_loop_function()
{
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            condition.wait(lock, [this](){
                return !queue.empty() || terminate_pool;
            });
            Job = queue.front();
            queue.pop();
        }

        Job(); // function<void()> type
    }
};
  1. 创建一个函数来将作业添加到您的队列中
void Pool::Add_Job(function<void()> New_Job)
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        queue.push(New_Job);
    }

    condition.notify_one();
}
  1. 将任意函数绑定到您的队列
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

集成这些成分后,您就拥有了自己的动态线程池。这些线程一直在运行,等待作业完成。

如果有一些语法错误,我深表歉意,我输入了这段代码,但我记性不好。抱歉,我无法为您提供完整的线程池代码;这会违反我的职业操守。

编辑:要终止池,请调用shutdown() 方法:

Pool::shutdown()
{
    {
        std::unique_lock<std::mutex> lock(threadpool_mutex);
        terminate_pool = true; // use this flag in condition.wait
    }

    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for (std::thread &th : threads)
    {
        th.join();
    }

    pool.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}

注意:使用匿名代码块时,在它们退出时,在其中创建的std::unique_lock 变量会超出范围,从而解锁互斥锁。

【讨论】:

  • 当thread(const thread&) = delete时你怎么有一个vector
  • @ChristopherPisz std::vector 不要求其元素是可复制的。您可以使用仅移动类型的向量(unique_ptrthreadfuture 等)。
  • 在shutdown()中,应该是thread_vector.clear();而不是 thread_vector.empty();对吗?
  • 当您终止并且没有剩余工作时会发生什么?
  • "Infinite_loop_function" 是一个从队列中消费任务并执行它们的函数的有趣名称。
【解决方案2】:

您可以使用 C++ 线程池库,https://github.com/vit-vit/ctpl

那么你写的代码可以替换成下面的代码

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

您将获得所需数量的线程,并且不会在迭代中一遍又一遍地创建和删除它们。

【讨论】:

  • 这应该是答案;单头、可读、直接、简洁且符合标准的 C++11 库。干得好!
  • @vit-vit 你能举个例子吗?你如何在results[j] = p.push([&amp;arr, j](int){ arr[j] +=2; }); 推送一个类成员函数
  • @HaniGoc 通过引用捕获实例即可。
  • @vit-vit 向您发送拉取请求以改进 STL 版本。
  • @vit-vit:很难联系到那个库的维护者有问题,提示提示。
【解决方案3】:

线程池意味着您的所有线程一直在运行——换句话说,线程函数永远不会返回。为了让线程做一些有意义的事情,你必须设计一个线程间通信系统,既是为了告诉线程有事要做,也是为了传达实际的工作数据。

通常这将涉及某种并发数据结构,并且每个线程可能会在某种条件变量上休眠,当有工作要做时会收到通知。收到通知后,一个或多个线程唤醒,从并发数据结构中恢复任务,处理它,并以类似的方式存储结果。

然后线程会继续检查是否还有更多工作要做,如果没有,则返回睡眠。

结果是您必须自己设计这一切,因为没有普遍适用的“工作”的自然概念。这是一项相当多的工作,并且您必须解决一些微妙的问题。 (如果您喜欢在后台为您处理线程管理的系统,您可以使用 Go 进行编程。)

【讨论】:

  • “你必须自己设计这一切”
  • @Yktula:嗯,这是一项非常重要的任务。从您的帖子中甚至不清楚您想要完成什么样的工作,而这对于解决方案至关重要。你可以用 C++ 实现 Go,但这将是一个非常具体的事情,一半的人会抱怨他们想要不同的东西。
【解决方案4】:

线程池的核心是一组线程,它们都绑定到作为事件循环工作的函数。这些线程将无休止地等待一个任务被执行,或者它们自己的终止。

线程池作业是提供一个提交作业的接口,定义(也可能是修改)运行这些作业的策略(调度规则、线程实例化、池大小),并监控线程和相关资源的状态.

因此,对于多功能池,必须首先定义任务是什么,它是如何启动、中断的,结果是什么(请参阅该问题的 promise 和 future 的概念),线程将发生什么样的事件必须响应,他们将如何处理它们,如何将这些事件与任务处理的事件区分开来。如您所见,这可能会变得非常复杂,并且随着解决方案变得越来越复杂,会对线程的工作方式施加限制。

当前处理事件的工具相当简单(*):诸如互斥锁、条件变量之类的原语,以及在此之上的一些抽象(锁、屏障)。但在某些情况下,这些抽象可能会变得不合适(请参阅相关的question),必须恢复使用原语。

还需要处理其他问题:

  • 信号
  • i/o
  • 硬件(处理器关联、异构设置)

这些将如何在您的环境中发挥作用?

This answer 类似的问题指向一个用于 boost 和 stl 的现有实现。

我为另一个问题提供了一个线程池的very crude implementation,它并没有解决上面列出的许多问题。你可能想建立在它之上。您可能还想看看其他语言的现有框架,以寻找灵感。


(*) 我不认为这是一个问题,恰恰相反。我认为这正是继承自 C 的 C++ 精神。

【讨论】:

  • “此答案”链接指向该问题,我没有找到您所指的答案。
【解决方案5】:
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}

【讨论】:

  • 谢谢!这确实帮助我开始使用并行线程操作。我最终使用了您的实现的略微修改版本。
  • 你不需要 m_accept_functions 是原子类型。 m_accept_functions 受互斥体保护。
  • 可以调用join()就好了
【解决方案6】:

您可以使用 boost 库中的 thread_pool

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

您也可以使用来自开源社区的threadpool

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}

【讨论】:

    【解决方案7】:

    这样的事情可能会有所帮助(取自一个正在运行的应用程序)。

    #include <memory>
    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    
    struct thread_pool {
      typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
    
      thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
        for (int i = 0; i < threads; ++i) {
          auto worker = [this] { return service.run(); };
          grp.add_thread(new boost::thread(worker));
        }
      }
    
      template<class F>
      void enqueue(F f) {
        service.post(f);
      }
    
      ~thread_pool() {
        service_worker.reset();
        grp.join_all();
        service.stop();
      }
    
    private:
      boost::asio::io_service service;
      asio_worker service_worker;
      boost::thread_group grp;
    };
    

    你可以这样使用它:

    thread_pool pool(2);
    
    pool.enqueue([] {
      std::cout << "Hello from Task 1\n";
    });
    
    pool.enqueue([] {
      std::cout << "Hello from Task 2\n";
    });
    

    请记住,重新发明一个高效异步排队机制并非易事。

    Boost::asio::io_service 是一种非常高效的实现,或者实际上是特定于平台的包装器的集合(例如,它包装了 Windows 上的 I/O 完成端口)。

    【讨论】:

    • C++11 需要这么大的提升吗?比如说,std::thread 不够吗?
    • std 中没有对应的 boost::thread_groupboost::thread_groupboost::thread 实例的集合。但当然,用std::threads 的vector 替换boost::thread_group 非常容易。
    【解决方案8】:

    编辑:这现在需要 C++17 和概念。 (截至 2016 年 9 月 12 日,只有 g++ 6.0+ 就足够了。)

    因此,模板推导更加准确,因此值得努力获得更新的编译器。我还没有找到需要显式模板参数的函数。

    它现在还接受任何适当的可调用对象(并且仍然是静态类型安全的!!!)。

    它现在还包括一个使用相同 API 的可选绿色线程优先级线程池。不过,此类仅适用于 POSIX。它使用ucontext_t API 进行用户空间任务切换。


    我为此创建了一个简单的库。下面给出一个使用示例。 (我之所以回答这个问题,是因为这是我在决定有必要自己编写之前发现的事情之一。)

    bool is_prime(int n){
      // Determine if n is prime.
    }
    
    int main(){
      thread_pool pool(8); // 8 threads
    
      list<future<bool>> results;
      for(int n = 2;n < 10000;n++){
        // Submit a job to the pool.
        results.emplace_back(pool.async(is_prime, n));
      }
    
      int n = 2;
      for(auto i = results.begin();i != results.end();i++, n++){
        // i is an iterator pointing to a future representing the result of is_prime(n)
        cout << n << " ";
        bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
        if(prime)
          cout << "is prime";
        else
          cout << "is not prime";
        cout << endl;
      }  
    }
    

    您可以传递async 任何具有任何(或无效)返回值和任何(或没有)参数的函数,它将返回相应的std::future。要获得结果(或等到任务完成),您可以在未来致电 get()

    这里是 github:https://github.com/Tyler-Hardin/thread_pool

    【讨论】:

    • 看起来很棒,但如果能与 vit-vit 的标题进行比较就更好了!
    • @Sh3ljohn,乍一看,似乎它们在 API 中基本相同。 vit-vit 使用 boost 的无锁队列,比我的要好。 (但我的目标是专门用 std::* 来做。我想我可以自己实现无锁队列,但这听起来很难而且容易出错。)此外,vit-vit 没有关联的 .cpp,它对于不知道自己在做什么的人来说,使用起来更简单。 (例如github.com/Tyler-Hardin/thread_pool/issues/1
    • 他/她也有一个 stl-only 解决方案,我在过去几个小时里一直在分叉,起初它看起来比你的更复杂,到处都是共享指针,但这实际上是需要正确处理热调整大小。
    • @Sh3ljohn,啊,我没有注意到热调整大小。那很好。我选择不担心它,因为它实际上不在预期的用例中。 (我想不出我个人想要调整大小的情况,但这可能是由于缺乏想象力。)
    • 用例示例:您在服务器上运行 RESTful API,需要临时减少资源分配以进行维护,而无需完全关闭服务。
    【解决方案9】:

    看起来线程池是非常流行的问题/练习:-)

    我最近用现代 C++ 写了一个;它归我所有,可在此处公开获取 - https://github.com/yurir-dev/threadpool

    它支持模板化返回值、核心固定、某些任务的排序。 所有实现都在两个 .h 文件中。

    所以,原来的问题是这样的:

    #include "tp/threadpool.h"
    
    int arr[5] = { 0 };
    
    concurency::threadPool<void> tp;
    tp.start(std::thread::hardware_concurrency());
    
    std::vector<std::future<void>> futures;
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            futures.push_back(tp.push([&arr, j]() {
                   arr[j] += 2;
                }));
        }
    }
    
    // wait until all pushed tasks are finished.
    for (auto& f : futures)
        f.get();
    // or just tp.end(); // will kill all the threads
    
    arr[4] = *std::min_element(arr, arr + 4);
    

    【讨论】:

    • 在引用自己的内容时请务必阅读Stack Overflow's self-promotion policy
    • @JeremyCaney 那有什么问题?他不卖任何东西,只是展示他公开可用的 FOSS 库。
    • @original.roland:如果您对自我推销规则有任何疑问,我建议您在 Meta Stack Exchange 上提出。
    • @JeremyCaney 我对自我推销规则没有任何疑问,我对此完全满意,只是看不出这个答案会如何违反任何规则。还是你只是想随机记住 yuir 来阅读政策?
    • @original.roland:至少,在引用他们自己的内容时,他们应该承认这是他们自己的内容。在这种情况下,这并不是什么大问题,而且很容易解决,这就是为什么我在没有例如的情况下提醒他们该政策的原因。标记答案。我的假设只是他们不知道这项政策。但是,他们应该edit 回答以确认他们是链接存储库的所有者。
    猜你喜欢
    • 2013-07-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-01-25
    • 2015-07-15
    • 1970-01-01
    • 2020-04-24
    • 1970-01-01
    相关资源
    最近更新 更多