【问题标题】:multithreaded program producer/consumer [boost]多线程程序生产者/消费者 [boost]
【发布时间】:2012-10-13 11:03:00
【问题描述】:

我正在使用 boost 库和 C++。我想创建一个包含生产者、消费者和堆栈的多线程程序。生产者填充堆栈,消费者从堆栈中删除项目(int)。一切正常(pop,push,mutex)但是当我在线程中调用pop/push时,我没有任何效果

我做了这个简单的代码:

#include "stdafx.h"
#include <stack>
#include <iostream>
#include <algorithm>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time.hpp> 
#include <boost/signals2/mutex.hpp>
#include <ctime>

using namespace std;

/ *
* this class reprents a stack which is proteced by mutex
* Pop and push are executed by one thread each time.
*/
class ProtectedStack{
private : 
stack<int> m_Stack;
boost::signals2::mutex m;

public : 
ProtectedStack(){
}
ProtectedStack(const ProtectedStack & p){

}
void push(int x){
    m.lock();
    m_Stack.push(x);
    m.unlock();
}

void pop(){
    m.lock();
    //return m_Stack.top();
    if(!m_Stack.empty())
        m_Stack.pop();
    m.unlock(); 
}
int size(){
    return m_Stack.size();
}
bool isEmpty(){
    return m_Stack.empty();
}
int top(){
    return m_Stack.top();
}
};

/*
*The producer is the class that fills the stack. It encapsulate the thread object 
*/

class Producer{
public:
Producer(int number ){
    //create thread here but don't start here
m_Number=number;


}
void fillStack (ProtectedStack& s ) {
    int object = 3; //random value
    s.push(object);
    //cout<<"push object\n";
}

void produce (ProtectedStack & s){
    //call fill within a thread 
    m_Thread = boost::thread(&Producer::fillStack,this, s);  
}

 private :
int m_Number;
boost::thread m_Thread;

};


/* The consumer will consume the products produced by the producer */ 

class Consumer {
private : 
int m_Number;
boost::thread m_Thread;
public:
Consumer(int n){
    m_Number = n;
}

void remove(ProtectedStack &s ) {

     if(s.isEmpty()){ // if the stack is empty sleep and wait for the producer      to fill the stack
        //cout<<"stack is empty\n";
        boost::posix_time::seconds workTime(1); 
        boost::this_thread::sleep(workTime);
     }
     else{
        s.pop(); //pop it
        //cout<<"pop object\n";

     }

}

void consume (ProtectedStack & s){
    //call remove within a thread 
    m_Thread = boost::thread(&Consumer::remove, this, s);  
}

};


int main(int argc, char* argv[])  
{  



ProtectedStack s;


    Producer p(0);
    p.produce(s);

    Producer p2(1);
    p2.produce(s);

    cout<<"size after production "<<s.size()<<endl;
    Consumer c(0);
    c.consume(s);
    Consumer c2(1);
    c2.consume(s);
    cout<<"size after consumption  "<<s.size()<<endl;

getchar();
return 0;  
}  

在我在 VC++ 2010 / win7 中运行它之后 我有 : 0 0

能否请您帮助我理解为什么当我从 main 调用 fillStack 函数时我得到了效果,但是当我从线程调用它时却没有任何反应? 谢谢

【问题讨论】:

    标签: c++ multithreading boost-thread boost-mutex


    【解决方案1】:

    您的代码的主要问题是您的线程不同步。 请记住,默认情况下,线程执行是没有顺序的,也不是按顺序执行的,因此消费者线程实际上可以(并且在您的特定情况下是)在任何生产者线程产生任何数据之前完成。

    为确保消费者在生产者完成工作后运行,您需要在生产者线程上使用thread::join() 函数,它将停止主线程执行直到生产者退出:

    // Start producers
    ...
    
    p.m_Thread.join();  // Wait p to complete
    p2.m_Thread.join(); // Wait p2 to complete
    
    // Start consumers
    ...
    

    这可以解决问题,但这可能不适合典型的生产者-消费者用例。

    要实现更有用的案例,您需要修复消费者功能。 您的消费者函数实际上不会等待生成的数据,如果堆栈为空,它将退出,如果尚未生成数据,则永远不会消耗任何数据。

    应该是这样的:

    void remove(ProtectedStack &s)
    {
       // Place your actual exit condition here,
       // e.g. count of consumed elements or some event
       // raised by producers meaning no more data available etc.
       // For testing/educational purpose it can be just while(true)
       while(!_some_exit_condition_)
       {
          if(s.isEmpty())
          {
              // Second sleeping is too big, use milliseconds instead
              boost::posix_time::milliseconds workTime(1); 
              boost::this_thread::sleep(workTime);               
          }               
          else
          {
             s.pop();
          }
       }
    } 
    

    另一个问题是thread构造函数用法错误:

    m_Thread = boost::thread(&Producer::fillStack, this, s);  
    

    引用Boost.Thread documentation:

    带参数的线程构造函数

    template &lt;class F,class A1,class A2,...&gt; thread(F f,A1 a1,A2 a2,...);

    前提条件: F 和每个 An 必须是可复制或可移动的。

    效果: 好像thread(boost::bind(f,a1,a2,...))。因此, f 和每个 an 都被复制到 供新线程访问的内部存储

    这意味着您的每个线程都会收到自己的s 副本,并且所有修改都不会应用于s,而是应用于本地线程副本。当您按值将对象传递给函数参数时,情况也是如此。您需要通过引用传递 s 对象 - 使用 boost::ref:

    void produce(ProtectedStack& s)
    {
       m_Thread = boost::thread(&Producer::fillStack, this, boost::ref(s));
    }
    
    void consume(ProtectedStack& s)
    {
       m_Thread = boost::thread(&Consumer::remove, this, boost::ref(s));
    }  
    

    另一个问题是关于互斥锁的使用。这不是最好的。

    1. 为什么要使用 Signals2 库中的互斥锁?只需使用 Boost.Thread 中的 boost::mutex 并删除对 Signals2 库的不需要的依赖即可。

    2. 使用 RAII 包装器 boost::lock_guard 而不是直接调用 lock/unlock

    3. 正如其他人所说,你应该用锁保护ProtectedStack的所有成员。

    示例:

    boost::mutex m;
    
    void push(int x)
    { 
       boost::lock_guard<boost::mutex> lock(m);
       m_Stack.push(x);
    } 
    
    void pop()
    {
       boost::lock_guard<boost::mutex> lock(m);
       if(!m_Stack.empty()) m_Stack.pop();
    }              
    
    int size()
    {
       boost::lock_guard<boost::mutex> lock(m);
       return m_Stack.size();
    }
    
    bool isEmpty()
    {
       boost::lock_guard<boost::mutex> lock(m);
       return m_Stack.empty();
    }
    
    int top()
    {
       boost::lock_guard<boost::mutex> lock(m);
       return m_Stack.top();
    }
    

    【讨论】:

    • 感谢您的回复。我按照您的说法更改了代码,但结果仍然相同。实际上问题似乎是由线程调用引起的。当我写 p.fillStack 时,我把堆栈填满了。当我从 boost::thread 调用 p.fillStack 时,什么也没有发生。这对我来说似乎很奇怪,因为我是 c++/boost 的初学者。
    • @ezzakrem 看起来我找到了问题的根本原因。看看更新的答案。
    • 1 非常感谢!它工作得很好我已经使用了指针而不是 refs,但是使用 boost::ref 它可以完美地工作,特别是通过将 boost::mutex 与 lock_gard 一起使用。您的建议对我很有帮助,再次感谢您。
    【解决方案2】:

    您的示例代码遇到了其他人指出的几个同步问题:

    • 对 ProtectedStack 的某些成员的调用缺少锁。
    • 主线程可能会在不允许工作线程加入的情况下退出。
    • 生产者和消费者不会像您期望的那样循环。生产者应始终(在可能的情况下)进行生产,而消费者应在新元素被推入堆栈时继续消费。
    • 主线程上的 cout 可能会在生产者或消费者有机会工作之前很好地执行。

    我建议您考虑使用条件变量在您的生产者和消费者之间进行同步。看看这里的生产者/消费者示例:http://en.cppreference.com/w/cpp/thread/condition_variable 从 C++11 开始,它是标准库中的一个相当新的功能,并且从 VS2012 开始支持。在 VS2012 之前,您需要 boost 或使用 Win32 调用。

    使用条件变量来解决生产者/消费者问题很好,因为它几乎强制使用互斥锁来锁定共享数据,并且它提供了一种信号机制让消费者知道某些东西已经准备好被消费,所以他们不会如此旋转(这始终是消费者的响应能力和轮询队列的 CPU 使用率之间的权衡)。它本身也是原子的,这可以防止线程丢失信号的可能性,如这里解释的那样:https://en.wikipedia.org/wiki/Sleeping_barber_problem

    简要介绍条件变量如何处理此问题...

    • 生产者在其线程上执行所有耗时的活动而不拥有互斥锁。
    • 生产者锁定互斥体,将其生成的项目添加到全局数据结构(可能是某种队列),释放互斥体并发出信号通知单个消费者去 - 按此顺序。
    • 等待条件变量的消费者自动重新获取互斥锁,将项目从队列中移除并对其进行一些处理。在此期间,生产者已经在生产新商品,但必须等到消费者完成后才能将商品排队。

    这将对您的代码产生以下影响:

    • 不再需要 ProtectedStack,一个普通的堆栈/队列数据结构就可以了。
    • 如果您使用的是足够新的编译器,则无需提升 - 删除构建依赖项总是一件好事。

    我觉得线程对您来说相当新,所以我只能提供一些建议,看看其他人是如何解决同步问题的,因为您很难完全理解。对多线程和共享数据的环境中发生的事情感到困惑通常会导致诸如死锁之类的问题。

    【讨论】:

      【解决方案3】:

      在尝试消费之前,您没有检查生产线程是否已执行。您也没有锁定 size/empty/top...如果容器正在更新,那是不安全的。

      【讨论】:

      • 感谢您的回复。实际上,如果消费者发现一个空堆栈,我添加了一个 sleep(1) 。是的,我没有保护大小/空/顶部的想法。
      猜你喜欢
      • 2017-02-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-10-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多