【问题标题】:Thread pool stuck on wait condition线程池卡在等待状态
【发布时间】:2016-06-02 22:26:07
【问题描述】:

我在使用这个线程池类时遇到了卡在我的 c++ 程序中:

class ThreadPool {

    unsigned threadCount;
    std::vector<std::thread> threads;
    std::list<std::function<void(void)> > queue;

    std::atomic_int jobs_left;
    std::atomic_bool bailout;
    std::atomic_bool finished;
    std::condition_variable job_available_var;
    std::condition_variable wait_var;
    std::mutex wait_mutex;
    std::mutex queue_mutex;
    std::mutex mtx;

    void Task() {
        while (!bailout) {
            next_job()();
            --jobs_left;
            wait_var.notify_one();
       }
    }

    std::function<void(void)> next_job() {
        std::function<void(void)> res;
        std::unique_lock<std::mutex> job_lock(queue_mutex);

        // Wait for a job if we don't have any.
        job_available_var.wait(job_lock, [this]()->bool { return queue.size() || bailout; });

        // Get job from the queue
        mtx.lock();
        if (!bailout) {
            res = queue.front();
            queue.pop_front();
        }else {
            // If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
            res = [] {};
            ++jobs_left;
        }
        mtx.unlock();
        return res;
    }

public:
    ThreadPool(int c)
        : threadCount(c)
        , threads(threadCount)
        , jobs_left(0)
        , bailout(false)
        , finished(false)
    {
        for (unsigned i = 0; i < threadCount; ++i)
            threads[i] = std::move(std::thread([this, i] { this->Task(); }));
    }

    ~ThreadPool() {
        JoinAll();
    } 

    void AddJob(std::function<void(void)> job) {
        std::lock_guard<std::mutex> lock(queue_mutex);
        queue.emplace_back(job);
        ++jobs_left;
        job_available_var.notify_one();
    }

    void JoinAll(bool WaitForAll = true) {
        if (!finished) {
            if (WaitForAll) {
                WaitAll();
            }

            // note that we're done, and wake up any thread that's
            // waiting for a new job
            bailout = true;
            job_available_var.notify_all();

            for (auto& x : threads)
                if (x.joinable())
                    x.join();
            finished = true;
        }
    }

    void WaitAll() {
        std::unique_lock<std::mutex> lk(wait_mutex);
        if (jobs_left > 0) {
            wait_var.wait(lk, [this] { return this->jobs_left == 0; });
        }
        lk.unlock();
    }
};

gdb 说(停止阻塞执行时)卡在 (std::unique_lock&, ThreadPool::WaitAll()::{lambda()#1})+58>

我正在使用支持 c++14 (-std=c++1y) 的 g++ v5.3.0

如何避免这个问题?

更新

我已经编辑(重写)了课程:https://github.com/edoz90/threadpool/blob/master/ThreadPool.h

【问题讨论】:

  • 我相当肯定 std::atomic_bool 上的操作符不是原子操作。唯一有效的原子操作是标准的“原子操作库”部分中指定的一组 atomic_foo() 方法。因此,无互斥量的减量+notify_one() 可能没有与互斥量 lock+wait() 正确排序。我有点守旧。我还没有使用所有这些新的、花哨的 atomic-this-es 和 atomic-that-es。我使用互斥保护变量的老式方法,并且仅在持有互斥锁时通知。
  • wait_mutex 的目的是什么?
  • @SamVarshavchik all member functions of atomic are atomic 并且默认内存顺序是 memory_order_seq_cst
  • operator-- 在原子变量 is 上是原子的(等效于 fetch_add (-1))。这里的问题是在工作线程和调用AddJobJoinAll 的线程之间存在jobs_left 的竞争条件。 IMO,您应该摆脱该变量并直接使用作业队列,使用互斥体保护它,包括在 JoinAll 内部,并测试它的大小。
  • 首先您等待job_left 为零-然后您提高bailout-然后由于bailout 而将新作业添加到队列中-非常复杂-您为什么要这样做? finished 的目的是什么?而且您不需要在JoinAll 中使用queue_mutex 吗?一个非常简单的类的非常复杂的代码。

标签: c++ multithreading threadpool mutex


【解决方案1】:

这里的问题是你的工作数量的竞争条件。您正在使用一个互斥锁来保护队列,另一个来保护计数,这在语义上等同于队列大小。显然,第二个互斥体是多余的(并且使用不当),job_count 变量本身也是如此。

每个处理队列的方法都必须获得对它的独占访问权(甚至JoinAll 来读取它的大小),所以你应该在篡改它的三位代码中使用相同的queue_mutex(@987654324 @、AddJobnext_job)。

顺便说一句,在next_job() 处拆分代码在 IMO 中非常尴尬。如果您在单个函数中处理工作线程主体,您将避免调用虚拟函数。

编辑:

正如其他 cmets 已经说过的那样,您最好不要把注意力从代码上移开,并在全球范围内重新考虑问题。

这里你唯一需要保护的是作业队列,所以你只需要一个互斥体。

然后是唤醒各种actor的问题,这需要一个条件变量,因为C++基本上没有给你任何其他可用的同步对象。

这里你不需要多个变量。终止线程池相当于将作业出列而不执行它们,这可以通过任何方式完成,无论是在工作线程本身中(如果设置了终止标志,则跳过执行)或在JoinAll函数中(清除队列获得独家访问权后)。

最后但并非最不重要的一点是,一旦有人决定关闭池,您可能希望使 AddJob 无效,否则您可能会在有人继续提供新工作时卡在析构函数中。

【讨论】:

  • “在 next_job() 处拆分代码非常尴尬”是什么意思? :)
  • 即使没有要执行的代码也不需要返回函数。您宁愿仅在实际作业出队时才执行该函数。
  • 听起来更干净、更简单。我真的没有时间测试它,但它看起来不错。您仍然可以在终止期间实施“不再接受工作”政策。一个简单的断言就可以解决问题。这个想法是通过检测奇怪的用例来让你的类更健壮,比如有人一直在找工作,而其他人试图关闭池。
【解决方案2】:

我认为你需要保持简单。 您似乎使用了太多互斥锁。所以有queue_mutex,您在添加和处理作业时使用它。

现在,当您等待读取队列时,需要另一个单独的互斥锁吗?

为什么不能只使用具有相同queue_mutex 的条件变量来读取WaitAll() 方法中的队列?

更新

我还建议在您的WaitAll 中使用lock_guard 而不是unique_lock。在特殊情况下,确实不需要将queue_mutex 锁定在WaitAll 之外。如果您异常退出WaitAll,则无论如何都应该释放它。

更新2

忽略我上面的更新。由于您使用的是条件变量,因此您不能在 WaitAll 中使用锁保护。但是,如果您使用的是unique_lock,请始终使用try_to_lock 版本,尤其是当您有多个控制路径时

【讨论】:

  • 您的意思是将wait_mutex 更改为queue_mutex
  • 既然是同一个队列,就应该使用同一个互斥锁来同步访问。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-08-29
  • 2010-10-16
  • 2014-06-20
  • 2015-04-15
  • 2015-07-13
  • 1970-01-01
相关资源
最近更新 更多