【问题标题】:C++: Thread pool slower than single threading?C++:线程池比单线程慢?
【发布时间】:2016-01-25 05:10:39
【问题描述】:

首先,我确实查看了该网站上的其他主题,发现它们与我的问题无关,因为这些主题主要涉及使用 I/O 操作或线程创建开销的人。我的问题是我的线程池或工作任务结构实现(在这种情况下)比单线程慢很多。我对此感到非常困惑,不确定是线程池、任务本身、我如何测试它、线程的性质还是我无法控制的东西。

// Sorry for the long code
#include <vector>
#include <queue>

#include <thread>
#include <mutex>
#include <future>

#include "task.hpp"

class ThreadPool
{
public:
    ThreadPool()
    {
        for (unsigned i = 0; i < std::thread::hardware_concurrency() - 1; i++)
            m_workers.emplace_back(this, i);

        m_running = true;
        for (auto&& worker : m_workers)
            worker.start();
    }
    ~ThreadPool()
    {
        m_running = false;
        m_task_signal.notify_all();
        for (auto&& worker : m_workers)
            worker.terminate();
    }

    void add_task(Task* task)
    {
        {
            std::unique_lock<std::mutex> lock(m_in_mutex);
            m_in.push(task);
        }
        m_task_signal.notify_one();
    }
private:
    class Worker
    {
    public:
        Worker(ThreadPool* parent, unsigned id) : m_parent(parent), m_id(id)
        {}
        ~Worker()
        {
            terminate();
        }

        void start()
        {
            m_thread = new std::thread(&Worker::work, this);
        }
        void terminate()
        {
            if (m_thread)
            {
                if (m_thread->joinable())
                {
                    m_thread->join();
                    delete m_thread;
                    m_thread = nullptr;
                    m_parent = nullptr;
                }
            }
        }
    private:
        void work()
        {
            while (m_parent->m_running)
            {               
                std::unique_lock<std::mutex> lock(m_parent->m_in_mutex);
                m_parent->m_task_signal.wait(lock, [&]()
                {
                    return !m_parent->m_in.empty() || !m_parent->m_running;
                });

                if (!m_parent->m_running) break;
                Task* task = m_parent->m_in.front();
                m_parent->m_in.pop();
                // Fixed the mutex being locked while the task is executed
                lock.unlock();

                task->execute();            
            }
        }
    private:
        ThreadPool* m_parent = nullptr;
        unsigned m_id = 0;

        std::thread* m_thread = nullptr;
    };
private:
    std::vector<Worker> m_workers;

    std::mutex m_in_mutex;
    std::condition_variable m_task_signal;
    std::queue<Task*> m_in;

    bool m_running = false;
};

class TestTask : public Task
{
public:
    TestTask() {}
    TestTask(unsigned number) : m_number(number) {}

    inline void Set(unsigned number) { m_number = number; }

    void execute() override
    {
        if (m_number <= 3)
        {
            m_is_prime = m_number > 1;
            return;
        }
        else if (m_number % 2 == 0 || m_number % 3 == 0)
        {
            m_is_prime = false;
            return;
        }
        else
        {
            for (unsigned i = 5; i * i <= m_number; i += 6)
            {
                if (m_number % i == 0 || m_number % (i + 2) == 0)
                {
                    m_is_prime = false;
                    return;
                }
            }
            m_is_prime = true;
            return;
        }
    }
public:
    unsigned m_number = 0;
    bool m_is_prime = false;
};

int main()
{
    ThreadPool pool;

    unsigned num_tasks = 1000000;
    std::vector<TestTask> tasks(num_tasks);
    for (auto&& task : tasks)
        task.Set(randint(0, 1000000000));

    auto s = std::chrono::high_resolution_clock::now();
    #if MT
    for (auto&& task : tasks)
        pool.add_task(&task);
    #else
    for (auto&& task : tasks)
        task.execute();
    #endif
    auto e = std::chrono::high_resolution_clock::now();
    double seconds = std::chrono::duration_cast<std::chrono::nanoseconds>(e - s).count() / 1000000000.0;
}

VS2013 Profiler 的基准测试:

10,000,000 tasks:
    MT:
        13 seconds of wall clock time
        93.36% is spent in msvcp120.dll
        3.45% is spent in Task::execute() // Not good here
    ST:
        0.5 seconds of wall clock time
        97.31% is spent with Task::execute()

【问题讨论】:

  • 首先展示您的“耗时”代码、如何进行测量以及如何编译它。可能很重要。
  • @deviantfan 我发现这个错误太晚了。修改后的答案。
  • 多少个核心?如果只有一个,多线程代码很容易比单线程慢。
  • 您的程序不会改变环境(stdio、文件系统、网络、渲染等)。理论上,过于聪明的编译器可以优化您的程序以使其无操作或接近它(因为它不会改变可观察的结果)。可以发一个ST变种main的反汇编吗?
  • 在MT版本中,您不应该等待任务完成吗?

标签: c++ multithreading


【解决方案1】:

此类答案中的常见免责声明:唯一确定的方法是使用分析器工具对其进行测量。

但我会尝试在没有它的情况下解释你的结果。首先,所有线程都有一个互斥锁。所以一次只有一个线程可以执行某些任务。它会扼杀你可能拥有的所有收益。尽管您有线程,但您的代码是完全串行的。所以至少让你的任务执行在互斥锁之外。您只需要锁定互斥锁以将任务从队列中取出 - 当任务执行时您不需要持有它。

接下来,您的任务非常简单,单线程将立即执行它们。您无法衡量此类任务的任何收益。创建一些繁重的任务,可以产生一些更有趣的结果(一些更接近现实世界的任务,而不是那么做作)。

第三点:线程并非没有成本——上下文切换、互斥争用等。如前两点所说,要获得真正的收益,您需要有比线程引入的开销更多时间的任务,并且代码应该是真正并行的,而不是等待某些资源使其串行。

UPD:我查看了代码的错误部分。只要您创建的任务数量足够多,任务就足够复杂。


UPD2:我玩过你的代码,发现了一个很好的质数来展示 MT 代码的优势。使用以下素数:1019048297。它将提供足够的计算复杂度来显示差异。

但是为什么你的代码没有产生好的结果呢?没有看到randint() 的实现很难说,但我认为它非常简单,在一半的情况下它返回偶数,而其他情况也不会产生很多大素数。因此,这些任务非常简单,以至于围绕特定实现和线程的上下文切换和其他事情通常比计算本身消耗更多时间。使用我给你的素数让任务别无选择,只能花时间计算——这不是一个简单的答案,因为这个数字很大而且实际上是素数。这就是为什么大数字会给你你所寻求的答案——MT 代码的更好时间。

【讨论】:

  • 你有什么不难完成的繁重任务吗?
  • @James,请参阅更新后的答案。我查看了您代码的错误部分
  • 我已修复互斥锁问题并使用分析器更新基准
  • 我明白了...我最初的目标是在我正在处理的另一个项目中卸载 ST 任务。我会看看这个功能是否值得。
【解决方案2】:

在执行任务时不要持有互斥锁,否则其他线程将无法获得任务:

void work() {
    while (m_parent->m_running) {   
        Task* currentTask = nullptr;    
        std::unique_lock<std::mutex> lock(m_parent->m_in_mutex);
        m_parent->m_task_signal.wait(lock, [&]() {
            return !m_parent->m_in.empty() || !m_parent->m_running;
        });                     
        if (!m_parent->m_running) continue;
        currentTask = m_parent->m_in.front();
        m_parent->m_in.pop();               
        lock.unlock(); //<- Release the lock so that other threads can get tasks
        currentTask->execute();
        currentTask = nullptr;
    }   
}       

【讨论】:

  • 我刚刚从 ixSci 的回答中解决了这个问题
  • @James 很高兴听到这个消息,只需确保将答案标记为已接受,这有助于您解决问题。
【解决方案3】:

对于 MT,“开销”的每个阶段花费了多少时间:std::unique_lockm_task_signal.waitfrontpopunlock

根据您仅 3% 有用工作的结果,这意味着上述消耗了 97%。我会得到上述每个部分的数字(例如,在每次调用之间添加时间戳)。

在我看来,您用于 [仅] 使下一个任务指针出列的代码非常繁重。我会做一个更简单的队列[可能是无锁的]机制。或者,也许,使用原子将索引插入队列,而不是上面的五步过程。例如:

void
work()
{
    while (m_parent->m_running) {
        // NOTE: this is just an example, not necessarily the real function
        int curindex = atomic_increment(&global_index);
        if (curindex >= max_index)
            break;

        Task *task = m_parent->m_in[curindex];

        task->execute();
    }
}

另外,也许你应该一次弹出[比如说]十个而不是一个。

您可能还受到内存限制和/或“任务切换”限制。 (例如)对于访问数组的线程,四个以上的线程通常会使内存总线饱和。你也可能对锁有严重的争用,这样线程就会饿死,因为一个线程正在独占锁[间接地,即使使用新的unlock 调用]

线程间锁定通常涉及“序列化”操作,其中其他内核必须同步它们的乱序执行管道。

这是一个“无锁”实现:

void
work()
{
    // assume m_id is 0,1,2,...
    int curindex = m_id;

    while (m_parent->m_running) {
        if (curindex >= max_index)
            break;

        Task *task = m_parent->m_in[curindex];

        task->execute();

        curindex += NUMBER_OF_WORKERS;
    }
}

【讨论】:

  • 我有同样的想法,可能会推送 4 个(或任何多个)任务并通知所有 4 个线程并获取一定数量的任务。我有点避免原子,因为我还没有完全学习它们,但乍一看它们似乎比我的设置更好。我试试看。
  • 这是一个链接:stackoverflow.com/questions/33083270/… 它有一个我创建的 CAS 实现。但是,更重要的是,它里面有一个链接,指向 cppcon 上展示的关于无锁的视频讨论
猜你喜欢
  • 2020-08-15
  • 1970-01-01
  • 1970-01-01
  • 2012-09-05
  • 1970-01-01
  • 1970-01-01
  • 2020-05-16
  • 1970-01-01
相关资源
最近更新 更多