【问题标题】:Fair queue loses notifications公平队列丢失通知
【发布时间】:2015-09-01 13:27:57
【问题描述】:

考虑下面的代码

#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class Tqueue 
{
public:

   Tqueue() : m_next_ticket(0),
              m_counter(0) {}

   void push(const T& e){
       std::unique_lock<std::mutex> lock(m_mutex);
       m_queue.push(e);
       lock.unlock();
       m_cond.notify_all();
    };

   T wait_and_pop() {
       std::unique_lock<std::mutex> lock(m_mutex);
       int ticket = m_next_ticket++;
       m_cond.wait(lock,[=]{return (!m_queue.empty())
                  && (ticket == m_counter);});
       m_counter++;
       T data = m_queue.front();
       m_queue.pop();
       return data;
   }

private:
   int m_next_ticket;
   int m_counter;
   std::queue<T> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_cond;   
};

这应该是我想出的公平队列的模板。在这种情况下,公平意味着 wait_and_pop() 调用以不同线程调用它们的相同顺序返回。

例如: 线程 1 在空队列上调用 wait_and_pop() 并阻塞。然后 Thread 2 在一个空队列上调用 wait_and_pop() 并阻塞。之后 Thread 3 使用 push() 推送两个事件。现在 Thread 1 应该在 Thread 2 之前返回。

使用以下代码,它有时会起作用。但大多数时候 代码永远阻塞:

Tqueue<int> queue;

std::mutex mutex;

void test(int i) 
{
    auto bla = queue.wait_and_pop();
    std::cout << "Thread : "<<bla << std::endl;
}

const int SIZE = 200;

int main(int argc, char *argv[])
{
    std::vector<std::thread> threads;
    for(int i = 0; i < SIZE; ++i)
       threads.push_back(std::thread(test,i));
    for(int i = 0; i < SIZE; ++i)
        queue.push(i);
    for(int i = 0; i < SIZE; ++i)
       threads[i].join();
    return 0;
}

这个想法是为每个线程创建一个唯一的票证。使用条件变量,然后我们在 wait_and_pop() 函数中等待,直到 插入了一个新事件。在 push() 函数中,新事件被插入队列并通知所有等待线程。每个线程检查是否 如果唯一票等于当前计数器,则队列不再为空。如果是这样,特定线程离开条件循环, 从队列中弹出当前事件并增加计数器。

我怀疑有些通知丢失了,但我无法理解为什么会这样。任何想法如何解决此问题或如何以正确的方式实现此问题?

编辑 我将队列中的代码更改如下。现在它似乎工作了。 重要的部分是,我通知,同时仍然持有锁(在 push() 和 wait_and_pop() 中)。此外,我将票证系统更改为线程 ID 队列,但这只是为了方便,它可以保持源代码紧凑。但我不确定,如果我想使用队列 生产代码,因为我不明白为什么它现在可以工作,而且我不知道它是否适用于所有情况。也许有人可以对此发表评论?

template <typename T>
class Tqueue 
{
public:
   void push(const T& e){
       std::unique_lock<std::mutex> lock(m_mutex);
       m_queue.push(e);
       m_cond.notify_all();
   };

   T wait_and_pop() {
       std::unique_lock<std::mutex> lock(m_mutex);
       m_ids.push(std::this_thread::get_id());
       m_cond.wait(lock,[=]{return (!m_queue.empty())
                  && (m_ids.front() == std::this_thread::get_id());});
       T data = m_queue.front();
       m_queue.pop();
       m_ids.pop();
       m_cond.notify_all();
       return data;
   }

private:
   std::queue<T> m_queue;
   std::queue<std::thread::id> m_ids;
   std::mutex m_mutex;
   std::condition_variable m_cond;

 };

【问题讨论】:

  • 出色的线程练习:)
  • 请注意,您在超出临界区后进行打印。这意味着线程值将以随机顺序打印。
  • 它现在可以工作了,因为您确保在更改线程可能正在等待的共享数据时唤醒线程。您是否仍然有一个您认为应该可以工作的不工作的版本?如果是这样,请更新,有人会解释为什么它不起作用。 (顺便说一句,你真的有一个公平比性能更重要的用例吗?这不像线程文件联合申诉。)
  • 错误是你没有在wait_and_pop 中调用notify_all。在这种情况下,无论是否使用锁从push 调用notify_all 都没有区别(除了在大多数平台上使用锁调用它的效率稍高一些)。

标签: c++ multithreading stl notifications thread-safety


【解决方案1】:

通知确实丢失了。有可能大量的push 产生更少的线程被唤醒,因为当m_cond.notify_all(); 被执行时,它只是让等待线程runnable,即准备好运行。这些线程仍然需要等待轮到它们并获取m_cond.wait 内部的锁。

还有可能是主线程在单个等待线程最终执行之前继续获取互斥锁几次。这会导致通知饥饿。

为了使机制发挥作用,您需要随时通知情况受到影响。您已经通知m_queue.push(e);,这会影响第一个条件!m_queue.empty()。您还需要在wait_and_pop 末尾通知,以处理第二个条件ticket == m_counter

T wait_and_pop() {
   ....blah blah
   T data = m_queue.front();
   m_queue.pop();

   lock.unlock();
   m_cond.notify_all();
   return data;
}

注意:it is possible 在这里我的意思是 “最终会有一个线程调度最终发生”。我不是说“我不确定”

进一步说明: condition_variable.notify_all() 只保证最终唤醒线程。它不保证X 的呼叫次数将唤醒X 次。此外,由于你的情况,它被减少到保证只通知一个线程,这是根本原因。

关于wait_and_pop解锁前后通知
在释放wait_and_pop 的锁之前或之后通知应该没有任何区别。我指定的修改应该与编辑中的修改相同。我一直在进行一些变化不大的测试(线程数、等待 x 线程完成并再次推送),结果相同。

【讨论】:

  • 虽然您建议的解决方案是一种改进,但我仍然不时遇到死锁。我仍然不明白为什么我会丢失通知。但也许我只是不够聪明。这些多线程问题让我头疼:)
  • 简单来说,queue.push(i); 可以生成 200 条通知,但线程只唤醒一次。这是最坏的情况,而且可能性很小,但这是真的。由于操作系统调度程序决定保持主线程运行。
  • 如果你在wait_and_pop中仍然持有锁时通知会发生什么?
  • 如果你先全部推送然后再等待会发生什么?
  • 感谢您的进一步解释。当我先推动然后等待一切按预期工作。当我在仍然持有锁的同时通知时,它似乎工作。我不确定,因为我在另一台机器上测试了这个。我明天再试一次。
【解决方案2】:

如果在你开始推送后只启动了你的一个线程,它将永远被阻塞。 如果队列不为空,您应该先尝试。既然已经有数据,何必等。

【讨论】:

  • 不,实际上我不这么认为。当队列中已经有一个元素时,前提条件为真,不会发生阻塞。
  • 你是对的 - 它正在工作: m_cond.wait(lock, [=]{ return !m_queue.empty(); } );但如果我也使用ticket + m_counter,则不再使用。