【发布时间】:2014-03-12 12:02:20
【问题描述】:
我正在尝试了解新 C++ 11 标准中的基本多线程机制。我能想到的最基本的例子如下:
- 生产者和消费者在不同的线程中实现
- 生产者将一定数量的项目放入队列中
- 消费者从队列中取出物品(如果有的话)
这个例子也在许多关于多线程的教科书中使用,关于通信过程的一切都很好。但是,在停止消费者线程时我遇到了问题。
我希望消费者一直运行,直到它得到一个明确的停止信号(在大多数情况下,这意味着我等待生产者完成,以便我可以在程序结束之前停止消费者)。不幸的是,C++ 11 线程缺乏中断机制(例如,我从 Java 中的多线程中知道)。因此,我必须使用isRunning 之类的标志来表示我希望线程停止。
现在的主要问题是:在我停止生产者线程后,队列为空,消费者正在等待condition_variable 以在队列再次填充时获取信号。所以我需要在退出之前通过在变量上调用notify_all()来唤醒线程。
我找到了一个可行的解决方案,但它似乎有点混乱。 下面列出了示例代码(我很抱歉,但不知何故,对于“最小”的最小示例,我无法进一步减小代码大小):
队列类:
class Queue{
public:
Queue() : m_isProgramStopped{ false } { }
void push(int i){
std::unique_lock<std::mutex> lock(m_mtx);
m_q.push(i);
m_cond.notify_one();
}
int pop(){
std::unique_lock<std::mutex> lock(m_mtx);
m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });
if (m_isProgramStopped){
throw std::exception("Program stopped!");
}
int x = m_q.front();
m_q.pop();
std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
return x;
}
void stop(){
m_isProgramStopped = true;
m_cond.notify_all();
}
private:
std::queue<int> m_q;
std::mutex m_mtx;
std::condition_variable m_cond;
bool m_isProgramStopped;
};
制作人:
class Producer{
public:
Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }
void produce(){
for (int i = 0; i < 5; i++){
m_q.push(m_counter++);
std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
}
}
void execute(){
m_t = std::thread(&Producer::produce, this);
}
void join(){
m_t.join();
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_counter;
};
消费者:
class Consumer{
public:
Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
{ }
~Consumer(){
std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
}
void consume(){
while (m_isRunning){
try{
m_q.pop();
m_takeCounter++;
}
catch (std::exception e){
std::cout << "Program was stopped while waiting." << std::endl;
}
}
}
void execute(){
m_t = std::thread(&Consumer::consume, this);
}
void join(){
m_t.join();
}
void stop(){
m_isRunning = false;
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_takeCounter;
bool m_isRunning;
};
最后是main():
int main(void){
Queue q;
Consumer cons{ q };
Producer prod{ q };
cons.execute();
prod.execute();
prod.join();
cons.stop();
q.stop();
cons.join();
std::cout << "END" << std::endl;
return EXIT_SUCCESS;
}
这是结束等待条件变量的线程的正确方法还是有更好的方法?目前,队列需要知道程序是否已经停止(我认为这会破坏组件的松散耦合),我需要在队列上显式调用stop(),这似乎不正确。
此外,如果队列为空,则应该用作信号的条件变量现在代表另一个条件 - 如果程序已经结束。如果我没记错的话,每次线程在条件变量上等待某个事件发生时,它还必须检查线程是否必须在继续执行之前停止(这似乎也是错误的)。
我有这些问题是因为我的整个设计有问题还是我缺少一些可以用来以干净方式退出线程的机制?
【问题讨论】:
-
这几乎就是我们在代码中所做的。设置一个“停止”变量并通知条件变量,并让它测试该标志作为它所做的第一件事。我们似乎没有找到一个简单、更优雅的通用解决方案。
-
您可以从队列的析构函数中调用
stop()。查看类似的解决方案stackoverflow.com/a/9711916/412080 -
你也可以使用哨兵 (c2.com/cgi/wiki?SentinelPattern)
-
Queue::m_isProgramStopped和Consumer::m_isRunning都应该是atomic<bool>或atomic_flag,因为main()在不受互斥锁保护的情况下同时写入它们。 -
@Praetorian Making
Queue::m_isProgramStoppedatomic 将使其免于数据竞争,但仍允许stop中的notify和pop中的wait之间的竞争。在pop中的线程检查m_isProgramStopped之后,stop到notify中的线程可能在条件变量上休眠,导致通知丢失并且主线程永远等待永远不会退出的消费者。更好的解决方案是使用m_mtx保护对m_isProgramStopped的所有访问。
标签: c++ multithreading c++11 interrupt condition-variable