【问题标题】:consumer/producer in c++C++中的消费者/生产者
【发布时间】:2012-03-12 20:50:31
【问题描述】:

这是一个经典的 c/p 问题,其中一些线程产生数据,而其他线程读取数据。生产者和消费者都共享一个 const 大小的缓冲区。如果缓冲区为空,则消费者必须等待,如果缓冲区已满,则生产者必须等待。我正在使用信号量来跟踪已满或空的队列。生产者将减少空闲点信号量,增加值,并增加填充槽信号量。所以我试图实现一个程序,从生成器函数中获取一些数字,然后打印出这些数字的平均值。通过将此视为生产者 - 消费者问题,我试图在程序的执行中节省一些时间。 generateNumber 函数会导致进程出现一些延迟,因此我想创建多个生成数字的线程,并将它们放入队列中。然后运行主函数的“主线程”必须从队列中读取并找到总和然后平均。所以这是我目前所拥有的:

#include <cstdio> 
#include <cstdlib>
#include <time.h>
#include "Thread.h" 
#include <queue> 

int generateNumber() {
    int delayms = rand() / (float) RAND_MAX * 400.f + 200;
    int result = rand() / (float) RAND_MAX * 20;
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = delayms * 1000000;
    nanosleep(&ts, NULL);
    return result; }


struct threadarg {
    Semaphore filled(0);
    Semaphore empty(n);
    std::queue<int> q; };


void* threadfunc(void *arg) {
    threadarg *targp = (threadarg *) arg;
    threadarg &targ = *targp;
    while (targ.empty.value() != 0) {
        int val = generateNumber();
        targ.empty.dec(); 
        q.push_back(val);
        targ.filled.inc(); }
}
int main(int argc, char **argv) {
    Thread consumer, producer;
    // read the command line arguments
    if (argc != 2) {
        printf("usage: %s [nums to average]\n", argv[0]);
        exit(1); }
    int n = atoi(argv[1]);
    // Seed random number generator
    srand(time(NULL));
}

我现在有点困惑,因为我不知道如何创建多个生产者线程来生成数字(如果 q 未满),而消费者正在从队列中读取(即如果 q 不为空)。我不知道应该把什么放在主要的地方来实现它。 同样在“Thread.h”中,您可以创建线程、互斥体或信号量。线程有 .run(threadFunc, arg)、.join() 等方法。可以锁定或解锁互斥体。我的代码中都使用了信号量方法。

【问题讨论】:

  • 嗨,丹,您不接受任何给您的答案。请激励社区回答您的问题。
  • 我很抱歉直到现在我才意识到这是一个选项!我接受了我之前提出的所有问题的答案。
  • 非常感谢您的回复。然而,与其说是我苦苦挣扎的代码,我只是不确定在哪里定义什么,尤其是与消费者有关的内容。
  • 在您发布的代码中,您有一个threadfunc,它是生产者:只需将其重命名为producer 并编写另一个名为consumer 的函数。 producer 已经在推送它的产品,所以没关系 - 你只需要将它绑定到 Thread producer 以便它实际运行。如果您需要多个生产者,请在多个 Thread 对象中运行相同的函数。 consumer 函数需要 pop 在循环中进行计算。最后,您需要弄清楚生产者和消费者如何知道何时停止!

标签: c++ c concurrency mutex producer-consumer


【解决方案1】:
#include<iostream>
#include<deque>
#include<mutex>
#include<chrono>
#include<condition_variable>
#include<thread>
using namespace std;
mutex mu,c_out;
condition_variable cv;
class Buffer
{
public:
    Buffer() {}
    void add(int ele)
    {
        unique_lock<mutex> ulock(mu);
        cv.wait(ulock,[this](){return q.size()<_size;});
        q.push_back(ele);
        mu.unlock();
        cv.notify_all();
        return;
    }
    int remove()
    {
     unique_lock<mutex> ulock(mu);
     cv.wait(ulock,[this](){return q.size()>0;});
     int v=q.back();
     q.pop_back();
     mu.unlock();
     cv.notify_all();
     return v;
    }
    int calculateAvarage()
    {
        int total=0;
        unique_lock<mutex> ulock(mu);
        cv.wait(ulock,[this](){return q.size()>0;});
        deque<int>::iterator it = q.begin();
        while (it != q.end())
        {
            total += *it;
            std::cout << ' ' << *it++;
        }
        return total/q.size();
    }
private:
    deque<int> q;
    const unsigned int _size=10;
};
class Producer
{
public:
    Producer(Buffer *_bf=NULL)
    {
        this->bf=_bf;
    }
    void Produce()
    {
        while(true)
        {
            int num=rand()%10;
            bf->add(num);
            c_out.lock();
            cout<<"Produced:"<<num<<"avarage:"<<bf->calculateAvarage()<<endl;
            this_thread::sleep_for(chrono::microseconds(5000));
            c_out.unlock();
        }
    }
private:
    Buffer *bf;
};
class Consumer
{
public:
    Consumer(Buffer *_bf=NULL)
    {
        this->bf=_bf;
    }
    void Consume()
    {
        while (true)
        {
            int num=bf->remove();
            c_out.lock();
            cout<<"Consumed:"<<num<<"avarage:"<<bf->calculateAvarage()<<endl;
            this_thread::sleep_for(chrono::milliseconds(5000));
            c_out.unlock();
        }
    }
private:
    Buffer *bf;
};
int main()
{
    Buffer b;
    Consumer c(&b);
    Producer p(&b);
    thread th1(&Producer::Produce,&p);
    thread th2(&Consumer::Consume,&c);
    th1.join();
    th2.join();
    return 0;
}

Buffer 类有双倍队列,最大 Buffer 大小为 10。 它有两个功能可以添加到队列中和从队列中删除。 Buffer类有calculateAvarage()函数,它会计算一个元素被添加或删除的平均时间。

还有两个类,一个是生产者和消费者,它们具有 buffwr 类指针。 我们在消费者类中有 Consume(),在 Producer 类中有 Produce()。 Consume()>>锁定缓冲区并检查缓冲区的大小是否不为0,它将从缓冲区中删除并通知生产者并解锁。 Produce()>>锁定缓冲区并检查缓冲区大小是否不是最大缓冲区大小,它将添加并通知消费者并解锁。

【讨论】:

    【解决方案2】:

    使用互斥锁保护队列访问,应该就是这样。一个“计算机科学 101”有界生产者-消费者队列需要两个信号量(管理空闲/空计数和生产者/消费者等待,正如您已经在做的那样)和一个 mutex/futex/criticalSection 来保护队列.

    我看不出用 condvars 替换信号量和互斥量有多大帮助。重点是什么?您如何使用 condvar 实现有界生产者-消费者队列,该队列适用于具有多个生产者/消费者的所有平台?

    【讨论】:

      【解决方案3】:

      像这样管理共享状态时,您需要一个条件变量和 互斥体。基本模式是一个函数,如下所示:

      ScopedLock l( theMutex );
      while ( !conditionMet ) {
          theCondition.wait( theMutex );
      }
      doWhatever();
      theCondition.notify();
      

      在你的情况下,我可能会创建条件变量和互斥锁 实现队列的类的成员。要写, conditionMet 将是 !queue.full(),所以你最终会得到一些东西 喜欢:

      ScopedLock l( queue.myMutex );
      while ( queue.full() ) {
          queue.myCondition.wait();
      }
      queue.insert( whatever );
      queue.myCondition.notify();
      

      阅读:

      ScopedLock l( queue.myMutex );
      while ( queue.empty() ) {
          queue.myCondition.wait();
      }
      results = queue.extract();
      queue.myCondition.notify();
      return results;
      

      根据线程接口的不同,可能有两个notify 功能:通知一个(唤醒单个线程),并通知所有 (唤醒所有等待的线程);在这种情况下,您需要 通知所有(或者你需要两个条件变量,一个用于空间 写入,一个用于读取,每个函数都在等待一个, 但通知对方)。

      【讨论】:

        【解决方案4】:

        您的队列未同步,因此多个生产者可以同时调用push_back,或者消费者同时调用pop_front ...这将中断。

        完成这项工作的简单方法是使用线程安全队列,它可以是您已有的 std::queue 的包装器,外加一个互斥体。

        您可以首先添加一个互斥体,然后在您转发到std::queue 的每个呼叫周围锁定/解锁它 - 对于单个消费者应该足够了,对于多个消费者,您需要融合 front() 和 @987654326 @ 到单个同步调用中。

        要让消费者在队列为空时阻塞,您可以在包装器中添加一个条件变量。

        这应该足够你可以在网上找到答案了——下面的示例代码。


        template <typename T> class SynchronizedQueue
        {
            std::queue<T> queue_;
            std::mutex mutex_;
            std::condition_variable condvar_;
        
            typedef std::lock_guard<std::mutex> lock;
            typedef std::unique_lock<std::mutex> ulock;
        
        public:
            void push(T const &val)
            {
                lock l(mutex_); // prevents multiple pushes corrupting queue_
                bool wake = queue_.empty(); // we may need to wake consumer
                queue_.push(val);
                if (wake) condvar_.notify_one();
            }
        
            T pop()
            {
                ulock u(mutex_);
                while (queue_.empty())
                    condvar_.wait(u);
                // now queue_ is non-empty and we still have the lock
                T retval = queue_.front();
                queue_.pop();
                return retval;
            }
        };
        

        用你的“Thread.h”给你的任何原语替换std::mutex等。

        【讨论】:

        • 关于消费者区块的倒数第二行是什么意思?
        • 假设你的消费者会调用​​pop()来获取下一个结果:如果队列是空的,这应该阻塞直到生产者添加一些东西,然后返回它;示例代码来了。
        【解决方案5】:

        我会这样做:

        • 创建一个隐藏队列的数据类
        • 创建线程安全的访问器方法,用于将一段数据保存到 q,并从 q 中删除一段数据(我会使用单个互斥体,或访问器的临界区)
        • 处理消费者没有任何数据可使用的情况(睡眠)
        • 处理 q 变得太满,生产者需要放慢速度的情况
        • 让线程在生产/消费时随意添加和删除

        另外,请记住在每个线程中添加睡眠,否则您将锁定 CPU,而不会为线程调度程序提供一个切换上下文并与其他线程/进程共享 CPU 的好地方。您不需要这样做,但这是一个很好的做法。

        【讨论】:

        • 我会投票,但你的建议并没有澄清这个问题。
        • 实际上,像这样更改架构本质上可以解决问题。
        • ...除非您的问题不是并发问题,而是创建可以运行多个线程的代码...
        • “记得在每个线程中添加一个睡眠”:它对我有很大帮助。谢谢
        猜你喜欢
        • 1970-01-01
        • 2015-12-14
        • 2018-09-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多