【问题标题】:C++ spawning threads depending on CPU cores availableC++ 生成线程取决于可用的 CPU 内核
【发布时间】:2021-09-03 11:45:39
【问题描述】:

以下是创建进程池的 Python 代码,为每个文件分配一个进程,最多可使用 CPU 数量。

我想把它翻译成 C++(但我们使用线程,而不是进程)。

但是,据我所知,std::async 不允许限制 CPU 的数量。

std::async 怎么知道不让系统过载?在 2 核 CPU 上生成 20 个线程是没有意义的。

我真的希望避免为必须非常常见的东西编写自己的线程池。

实现这一目标的最简单方法是什么?

pool = multiprocessing.Pool(processes=max_cpus)

for file in file_list:
    pool.apply_async(my_func, args=(file,),)

pool.close()
pool.join()

【问题讨论】:

  • std::async 不知道您需要一个线程池。然而,产生与 cpu 一样多的线程并不意味着您将拥有所有的 cpu。稍后我会发布一个演示线程池。
  • std::async 不会(必然)盲目地创建与您排队的任务一样多的线程。在实践中,标准库实现者将拥有一个线程池,但该线程池的大小显然是一个可能基于硬件规范和/或启发式的实现细节。
  • @CoryKramer 我知道这适用于 Windows/MSVC(它使用线程池),但不一定适用于其他系统/编译器,并且每次调用只会使用新的 std::thread
  • 你可以使用std::thread::hardware_concurrency()来查找CPU的数量。

标签: c++


【解决方案1】:

c++ 中的线程池示例,你也可以使用 boost 中的那个 (这可能是更好的测试)。

#include <condition_variable>
#include <exception>
#include <mutex>
#include <future>
#include <thread>
#include <vector>
#include <queue>

namespace details
{
class task_itf
{
public:
    virtual void execute() = 0;
};

template<typename retval_t>
class task final :
    public task_itf
{
public:
    template<typename lambda_t>
    explicit task(lambda_t&& lambda) :
        m_task(lambda)
    {
    }

    std::future<retval_t> get_future()
    {
        return m_task.get_future();
    }

    virtual void execute() override
    {
        m_task();
    }

private:
    std::packaged_task<retval_t()> m_task;
};

class stop_exception :
    public std::exception
{
};

}

class thread_pool
{
public:
    explicit thread_pool(const size_t size) :
        m_is_running{ true }
    {
        std::condition_variable signal_started;
        std::atomic<size_t> number_of_threads_started{ 0u };

        for (auto n = 0; n < size; ++n)
        {
            m_threads.push_back(std::move(std::thread([&]()
            {
                {
                    number_of_threads_started++;
                    signal_started.notify_all();
                }

                thread_loop();
            })));
        }

        // wait for all threads to have started.
        std::mutex mtx;
        std::unique_lock<std::mutex> lock{ mtx };
        signal_started.wait(lock, [&] { return number_of_threads_started == size; });
    }

    ~thread_pool()
    {
        m_is_running = false;
        m_wakeup.notify_all();

        for (auto& thread : m_threads)
        {
            thread.join();
        }
    }

    template<typename lambda_t>
    auto async(lambda_t&& lambda) 
    {
        using retval_t = decltype(lambda());
        auto task = std::make_shared<details::task<retval_t>>(lambda);
        queue_task(task);
        return task->get_future();
    }

    template<typename lambda_t>
    auto sync(lambda_t&& lambda) 
    {
        auto ft = async(lambda);
        return ft.get();
    }

private:
    void queue_task(const std::shared_ptr<details::task_itf>& task_ptr)
    {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_queue.push(task_ptr);
        m_wakeup.notify_one();
    }

    std::shared_ptr<details::task_itf> get_next_task()
    {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_wakeup.wait(lock);

        if (!m_is_running)
        {
            throw details::stop_exception();
        }

        auto task = m_queue.front();
        m_queue.pop();

        return task;
    }

    void thread_loop()
    {
        try
        {
            while (auto task = get_next_task())
            {
                task->execute();
            }
        }
        catch (const details::stop_exception&)
        {
        }
    }

    std::vector<std::thread> m_threads;
    std::mutex m_queue_mutex;
    std::queue<std::shared_ptr<details::task_itf>> m_queue;

    std::condition_variable m_wakeup;
    bool m_is_running;
};

int main()
{
    thread_pool pool(4);
    auto ft1 = pool.async([] {return 1; });
    auto ft2 = pool.async([] {return 2; });
    auto ft3 = pool.async([] {return 3; });
    auto ft4 = pool.async([] {return 4; });

    // pool is will still be waiting

    //synchronize with results
    auto r1 = ft1.get();
    auto r2 = ft2.get();
    auto r3 = ft3.get();
    auto r4 = ft4.get();

    return 0;
}

【讨论】:

    【解决方案2】:

    std::thread::hardware_concurrency() 将告诉您一次有多少 CPU 线程处于活动状态,或者您应该在线程池中保留多少线程(无论是 boost 还是您自己滚动)。

    以下是该方法的外观:

    • 调用std::thread::hardware_concurrency()获取核心数
    • 初始化线程池(现有的 impl 或您自己的)和结果
    • 将所需的计算包装在 std::packaged_task 中并将任务提交到线程池作业队列

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-09
      • 2012-10-12
      • 1970-01-01
      • 2017-08-22
      • 2020-04-23
      • 2014-02-07
      相关资源
      最近更新 更多