【发布时间】:2015-09-01 13:27:57
【问题描述】:
考虑下面的代码
#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class Tqueue
{
public:
Tqueue() : m_next_ticket(0),
m_counter(0) {}
void push(const T& e){
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(e);
lock.unlock();
m_cond.notify_all();
};
T wait_and_pop() {
std::unique_lock<std::mutex> lock(m_mutex);
int ticket = m_next_ticket++;
m_cond.wait(lock,[=]{return (!m_queue.empty())
&& (ticket == m_counter);});
m_counter++;
T data = m_queue.front();
m_queue.pop();
return data;
}
private:
int m_next_ticket;
int m_counter;
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
};
这应该是我想出的公平队列的模板。在这种情况下,公平意味着 wait_and_pop() 调用以不同线程调用它们的相同顺序返回。
例如: 线程 1 在空队列上调用 wait_and_pop() 并阻塞。然后 Thread 2 在一个空队列上调用 wait_and_pop() 并阻塞。之后 Thread 3 使用 push() 推送两个事件。现在 Thread 1 应该在 Thread 2 之前返回。
使用以下代码,它有时会起作用。但大多数时候 代码永远阻塞:
Tqueue<int> queue;
std::mutex mutex;
void test(int i)
{
auto bla = queue.wait_and_pop();
std::cout << "Thread : "<<bla << std::endl;
}
const int SIZE = 200;
int main(int argc, char *argv[])
{
std::vector<std::thread> threads;
for(int i = 0; i < SIZE; ++i)
threads.push_back(std::thread(test,i));
for(int i = 0; i < SIZE; ++i)
queue.push(i);
for(int i = 0; i < SIZE; ++i)
threads[i].join();
return 0;
}
这个想法是为每个线程创建一个唯一的票证。使用条件变量,然后我们在 wait_and_pop() 函数中等待,直到 插入了一个新事件。在 push() 函数中,新事件被插入队列并通知所有等待线程。每个线程检查是否 如果唯一票等于当前计数器,则队列不再为空。如果是这样,特定线程离开条件循环, 从队列中弹出当前事件并增加计数器。
我怀疑有些通知丢失了,但我无法理解为什么会这样。任何想法如何解决此问题或如何以正确的方式实现此问题?
编辑 我将队列中的代码更改如下。现在它似乎工作了。 重要的部分是,我通知,同时仍然持有锁(在 push() 和 wait_and_pop() 中)。此外,我将票证系统更改为线程 ID 队列,但这只是为了方便,它可以保持源代码紧凑。但我不确定,如果我想使用队列 生产代码,因为我不明白为什么它现在可以工作,而且我不知道它是否适用于所有情况。也许有人可以对此发表评论?
template <typename T>
class Tqueue
{
public:
void push(const T& e){
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(e);
m_cond.notify_all();
};
T wait_and_pop() {
std::unique_lock<std::mutex> lock(m_mutex);
m_ids.push(std::this_thread::get_id());
m_cond.wait(lock,[=]{return (!m_queue.empty())
&& (m_ids.front() == std::this_thread::get_id());});
T data = m_queue.front();
m_queue.pop();
m_ids.pop();
m_cond.notify_all();
return data;
}
private:
std::queue<T> m_queue;
std::queue<std::thread::id> m_ids;
std::mutex m_mutex;
std::condition_variable m_cond;
};
【问题讨论】:
-
出色的线程练习:)
-
请注意,您在超出临界区后进行打印。这意味着线程值将以随机顺序打印。
-
它现在可以工作了,因为您确保在更改线程可能正在等待的共享数据时唤醒线程。您是否仍然有一个您认为应该可以工作的不工作的版本?如果是这样,请更新,有人会解释为什么它不起作用。 (顺便说一句,你真的有一个公平比性能更重要的用例吗?这不像线程文件联合申诉。)
-
错误是你没有在
wait_and_pop中调用notify_all。在这种情况下,无论是否使用锁从push调用notify_all都没有区别(除了在大多数平台上使用锁调用它的效率稍高一些)。
标签: c++ multithreading stl notifications thread-safety