【问题标题】:C++11 thread-safe queueC++11 线程安全队列
【发布时间】:2013-02-23 01:45:57
【问题描述】:

我正在处理的一个项目使用多个线程来处理一组文件。每个线程都可以将文件添加到要处理的文件列表中,因此我将(我认为是)一个线程安全队列放在一起。相关部分如下:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

但是,我偶尔会在 if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } 块内出现段错误,并且 gdb 中的检查表明正在发生段错误,因为队列为空。这怎么可能?据我了解,wait_for 仅在收到通知后才返回 cv_status::no_timeout,并且这仅应在 FileQueue::enqueue 刚刚将新项目推送到队列后发生。

【问题讨论】:

  • 问题,你为什么要通过 ref-ref 获取filename?我在这里看不到任何原因>
  • @TonyTheLion 通常在 C++ 中,通过引用传递对象比制作副本更有效。在这种情况下,我还使用了移动语义,它允许编译器将字符串的内容移动到队列中,而不是制作另一个副本。
  • @slavik262:你在这里使用std::forward是不正常的(在“通用引用”中使用),你应该只是std::move它。
  • 实际上在这里利用移动语义的首选方法是使用std::move 并通过值而不是通过非常量右值引用来获取enqueuefilename 参数。事实上,它只能用右值调用,这可能不是你想要的。

标签: c++ multithreading c++11 queue condition-variable


【解决方案1】:

最好使条件(由您的条件变量监控)成为 while 循环的逆条件: while(!some_condition)。在这个循环中,如果条件失败,你就进入睡眠状态,触发循环体。

这样,如果您的线程被唤醒(可能是虚假的),您的循环仍会在继续之前检查条件。将条件视为感兴趣的状态,并将条件变量视为更多来自系统的信号,即该状态可能是准备好。循环将完成实际确认它是真的繁重的工作,如果不是,则进入睡眠状态。

我刚刚为异步队列编写了一个模板,希望对您有所帮助。在这里,q.empty() 是我们想要的相反条件:队列中有东西。所以它作为while循环的检查。

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif

【讨论】:

  • 谢谢!谢天谢地,我已经使用here 描述的谓词解决了这个问题。
  • 最简单、最优雅的解决方案,恕我直言。
  • 仅供参考:while(q.empty()) 循环相当于:c.wait( lock, [&amp;]{ return !q.empty(); } );
  • dequeue 被调用时,队列为空,它将不必要地获取锁并持有它。这会让 enqueuer 阻塞。你遇到了僵局。
  • @Ajay 为了那些将来会来这里(像我一样)阅读你评论的人的利益,不,事实并非如此,因为condition_variable::wait releases the lock and only reacquires when awaken
【解决方案2】:

根据标准condition_variables 允许虚假唤醒,即使事件尚未发生。如果发生虚假唤醒,它将返回cv_status::no_timeout(因为它是唤醒而不是超时),即使它没有被通知。正确的解决方案当然是在继续之前检查唤醒是否真的合法。

详细信息在标准§30.5.1 [thread.condition.condvar]中指定:

- 当调用 notify_one()、调用 notify_all()、abs_time 指定的绝对超时 (30.2.4) 到期或虚假发出信号时,该函数将解除阻塞。

...

返回: 如果 abs_time 指定的绝对超时 (30.2.4) 已过期,则返回 cv_status::timeout,否则为 cv_status::no_timeout。

【讨论】:

  • 你建议怎么做?只需检查队列是否再次为空?
  • 附录:我可能会使用诸如 here 之类的谓词来防止虚假唤醒。
  • 是的。它被称为 condition 变量,因为它与某个条件相关联,您必须检查它是否为真。在您的情况下,要检查的条件是 !q.empty()
  • 如果您使用 lambda 作为 wait() 调用的可选参数,它将为您进行检查并防止虚假唤醒产生任何影响。
  • 作为历史记录,这很可能是为了支持低级 Unix 系统调用的行为。如果从客户设计的角度来看,这有点令人不满意,那么早在 1989 年,Richard Gabriel 也是如此。他对这个主题的思考成为了一篇非常著名的软件设计文章,The Rise of "Worse-is-Better"
【解决方案3】:

这可能是你应该这样做:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}

【讨论】:

  • 谢谢!我最终做了类似的事情。
  • 两个 cmets 在其他很好的代码上:1) 在 notify_one 之前,我会出于 en.cppreference.com/w/cpp/thread/condition_variable/notify_one 上找到的原因解锁互斥锁 2) 等待过程可能会被虚假唤醒,所以我会另外引入一个 bool表示推送确实结束的变量
  • 1) 小幅优化。 2) wair_for 的重载通过第二个参数处理虚假唤醒。
  • 对不起,你是对的。实际上,1) 已经通过 lock_guard 完成了。我对 2) 的推理错误是,如果队列不为空,我认为推送操作可能正在进行中……但是,在这种情况下,仍然阻塞的互斥锁无论如何都会保持 wait_for 等待。谢谢!
【解决方案4】:

除了已接受的答案之外,我想说的是,实现正确的多生产者/多消费者队列很困难(不过,自 C++11 起更容易)

我建议你试试(非常好)lock free boost library,“队列”结构会做你想做的事,有无等待/无锁保证和without the need for a C++11 compiler

我现在添加这个答案是因为无锁库对于提升来说是相当新的(我相信从 1.53 开始)

【讨论】:

  • 感谢您指出该库。但是,目前似乎没有队列的文档。知道在哪里可以找到吗?
【解决方案5】:

我会将你的出队函数重写为:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

它更短,并且没有像您那样重复的代码。只有发出它可能会等待更长时间的超时。为防止您需要记住循环之前的开始时间,请检查超时并相应地调整等待时间。或者指定等待条件的绝对时间。

【讨论】:

    【解决方案6】:

    这个案例也有GLib解决方案,我还没试过,但我相信这是一个很好的解决方案。 https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

    【讨论】:

      【解决方案7】:

      BlockingCollection 是一个 C++11 线程安全集合类,提供对队列、堆栈和优先级容器的支持。它处理您描述的“空”队列场景。以及“完整”队列。

      【讨论】:

        【解决方案8】:

        您可能会喜欢 lfqueue,https://github.com/Taymindis/lfqueue。 它是无锁并发队列。我目前正在使用它来消耗来自多个来电的队列,并且工作起来就像一个魅力。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2014-12-14
          • 2014-03-13
          • 1970-01-01
          • 2023-03-04
          • 1970-01-01
          • 2015-02-16
          • 2012-11-05
          • 2010-11-15
          相关资源
          最近更新 更多