【问题标题】:Thread Pool C++线程池 C++
【发布时间】:2017-08-11 09:01:48
【问题描述】:

我有以下for 循环:

for (int i = 0; i < 100; i++) {
    someJob();
}

我只想用 5 个线程运行这个函数,我该怎么做?
我不能使用任何外部库。

我尝试过创建一个包含 5 个线程的数组,如果索引等于 5,则等待所有线程并再次执行此操作,但我确信还有另一种方法:

std::thread t[THREAD_COUNT];
int j=0;

for (int i = 0; i < 100; i++) {
    t[j++] = std::thread(someJob);
    if (j == THREAD_COUNT)
    {
        for (int k = 0; k < THREAD_COUNT; k++)
        {
            if (t[k].joinable())
                t[k].join();
        }
        j = 0;
    }
}

有什么建议吗? (我不能使用 boost)

谢谢!

【问题讨论】:

  • 是std::thread
  • 使用一个函数启动每个线程,该函数在循环中调用someJob 20 次。不是最好的方法,而是最简单的方法。否则使用具有 5 个线程的线程池从队列中拉取作业,包含 someJob 100 次。
  • 我不想将此添加到我的答案中,但您也可以使用我自己的库 concurrencpp,它保证 concurencpp::async 在线程池上运行:github.com/David-Haim/concurrencpp
  • 这并没有解决问题,但您不需要测试这些线程是否可连接。你没有分离它们,所以它们是可连接的。

标签: c++ multithreading


【解决方案1】:

您可以创建一个函数来测试您的线程数组以找到一个空线程来运行每个连续的作业。像这样的:

// synchronized output to prevent interleaving of results
#define sync_out(m) do{std::ostringstream o; o << m << '\n'; std::cout << o.str();}while(0)

void someJob(int id)
{
    sync_out("thread: " << id);
}

template<typename Job>
void start_thread(std::vector<std::thread>& threads, Job&& job)
{
    // find an ended thread
    for(auto&& thread: threads)
    {
        if(thread.joinable()) // still running or waiting to join
            continue;

        thread = std::thread(job);
        return;
    }

    // if not wait for one
    for(auto&& thread: threads)
    {
        if(!thread.joinable()) // dead thread (not run or already joined)
            continue;

        thread.join();
        thread = std::thread(job);
        return;
    }
}

int main()
{

    std::vector<std::thread> threads(5); // 5 threads

    for(int i = 0; i < 100; i++)
        start_thread(threads, [=]{someJob(i);});

    // wait for any unfinished threads    
    for(auto&& thread: threads)
        if(thread.joinable())
            thread.join();
}

【讨论】:

  • 您可能正在等待一个耗时较长的线程,而其他线程已完成。
  • @MichaëlRoy 当然。有更复杂、更复杂的方法可以提高效率。
  • 这个问题并不像看起来那么简单。直到我们在 stl 中看到类似 join_until() 的东西。
  • @MichaëlRoy 我认为这个问题涵盖了大量问题,我的解决方案很好地解决了这些问题。这只是一个简单的管道问题。当然,您可以找到更复杂的解决方案,在某些情况下可以提供更好的吞吐量。但这个问题几乎没有要求。
  • 我并没有试图淡化你的解决方案,如果你这样理解的话,对不起......我只是在陈述一个事实。这个问题不是微不足道的。甚至我的解决方案也不是那么完美,因为我分离了线程,并且在某些时候可能有超过 6 个线程在运行。
【解决方案2】:

您可以简单地使用std::async

如果你想在 5 个不同的异步动作中执行该函数 100 次,那么每个异步函数将执行该函数 20 次:

std::vector<std::future> results;
results.reserve(5);
for (auto i = 0; i< 5 ;i++){
   results.emplace_back([]{
     for(auto j = 0; j < 20 ; j++){
       doSomeFunction();
     } 
   });
}

for (auto& f : results){
    f.get();
}

同样的代码可以修改为裸std::thread

【讨论】:

  • 你不知道函数每次调用所花费的时间都是一样的。
【解决方案3】:

您应该使用Thread Pool

具体来说,您可以使用 C++ 线程池库CPTL,您的代码将如下所示:

ctpl::thread_pool p(2 /* two threads in the pool */);

for (int i = 0; i < 100; i++) {
    p.push(someJob, "additional_param");
}

【讨论】:

  • 谢谢,但我不能使用任何外部库
  • @JustASK:如果这是家庭作业,您应该在问题中明确说明。
  • 图书馆似乎也没有维护。自上次提交以来的 2 年和 4 个最长 2 年的未解决问题。
【解决方案4】:

OpenMP 将允许您简单地执行此操作,同时隐藏整个线程池。大多数编译器都有内置支持,但请参阅您的手册以获取特定选项。 (gcc 只需要传递 -fopenmp 作为选项)。

#pragma omp parallel for num_threads(5)
for (int i = 0; i < 100; i++) {
    someJob(i);
}

然后将您的工作分成 5 个线程。如果您忽略num_threads(5),它将自行选择多个线程。

【讨论】:

    【解决方案5】:

    这是一种在保持安全的同时动态实现线程池的方法。

    #include <thread>
    #include <vector>
    #include <algorithm>
    #include <mutex>
    
    void someJob() { /* some lengthy process */ }
    
    int main()
    {
        const size_t THREAD_COUNT = 5;
        std::vector<std::thread> threadPool;
        std::mutex mtx;               // to block access to the pool
    
        for (int i = 0; i < 100; i++) 
        {
            {
                // add new thread to the pool.
                std::lock_guard<std::mutex> lock(mtx);
                threadPool.emplace_back(std::thread([&mtx, &threadPool]()
                {
                    someJob();
    
                    // task is done, remove thread from pool
                    std::lock_guard<std::mutex> lock(mtx);
                    threadPool.erase(
                        std::find_if(threadPool.begin(), threadPool.end(), 
                        [](std::thread& x) 
                        { 
                            if (x.get_id() == std::this_thread::get_id())
                            {
                                x.detach();  // have to call detach, since we can't 
                                return true; // destroy an attached thread.
                            }
                            return false; 
                        })
                    );
                }));
            }
    
            for (;;)
            {
                // wait for a slot to be freed.
                std::this_thread::yield();
                std::lock_guard<std::mutex> lock(mtx);
                if (threadPool.size() < THREAD_COUNT)
                {
                    break;
                }
            }
        }
    
        // wait for the last tasks to be done
        for (;;)
        {
            std::this_thread::yield();
            std::lock_guard<std::mutex> lock(mtx); // works fine without.. but...
            if (threadPool.empty())                // <-- can't call join here, since detached 
            {                                      // threads are not joinable()
                break;
            }
        }
        return 0;
    }
    

    【讨论】:

      猜你喜欢
      • 2020-04-24
      • 2010-11-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-25
      • 2011-04-28
      • 2010-10-01
      • 1970-01-01
      相关资源
      最近更新 更多