【问题标题】:Multithreaded Producer/Consumer in C++C++中的多线程生产者/消费者
【发布时间】:2020-05-24 14:37:31
【问题描述】:

我正在研究多线程并编写了一个基本的生产者/消费者。我对下面写的生产者/消费者有两个问题。 1)即使将消费者睡眠时间设置为低于生产者睡眠时间,生产者似乎仍然执行得更快。 2)在消费者中,我复制了生产者完成添加到队列中的代码,但队列中仍有元素。对于构建代码的更好方法有什么建议吗?

#include <iostream>
#include <queue>
#include <mutex>

class App {
private:
    std::queue<int> m_data;
    bool m_bFinished;
    std::mutex m_Mutex;
    int m_ConsumerSleep;
    int m_ProducerSleep;
    int m_QueueSize;
public:
    App(int &MaxQueue) :m_bFinished(false), m_ConsumerSleep(1), m_ProducerSleep(5), m_QueueSize(MaxQueue){}
    void Producer() {

        for (int i = 0; i < m_QueueSize; ++i) {
            std::lock_guard<std::mutex> guard(m_Mutex);
            m_data.push(i); 
            std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
        }
        m_bFinished = true;
    }

    void Consumer() {
        while (!m_bFinished) {
            if (m_data.size() > 0) {
                std::lock_guard<std::mutex> guard(m_Mutex);
                std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
                m_data.pop();
            }
            else {
                std::cout << "No elements, skipping" << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
        }
        while (m_data.size() > 0) {
            std::lock_guard<std::mutex> guard(m_Mutex);
            std::cout << "Emptying remaining elements " << m_data.front() << std::endl;
            m_data.pop();
            std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
        }
    }

};


int main()
{
    int QueueElements = 10;
    App app(QueueElements);
    std::thread consumer_thread(&App::Consumer, &app);
    std::thread producer_thread(&App::Producer, &app);

    producer_thread.join();
    consumer_thread.join();


    std::cout << "loop exited" << std::endl;
    return 0;
}

【问题讨论】:

    标签: multithreading c++11 producer-consumer


    【解决方案1】:

    您应该使用condition_variable。不要对线程使用睡眠。

    主要方案: Producer 将 value 推入 lock 并发出 condition_variable 信号。

    消费者在条件变量的锁定下等待并检查谓词以防止虚假唤醒。

    我的版本:

    #include <iostream>
    #include <queue>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    #include <atomic>
    
    class App {
    private:
        std::queue<int> m_data;
        std::atomic_bool m_bFinished;
        std::mutex m_Mutex;
        std::condition_variable m_cv;
        int m_QueueSize;
    public:
        App(int MaxQueue) 
            : m_bFinished(false)
            , m_QueueSize(MaxQueue) 
        {}
    
        void Producer()
        {
            for (int i = 0; i < m_QueueSize; ++i) 
            {
                {
                    std::unique_lock<std::mutex> lock(m_Mutex);
                    m_data.push(i); 
                }
                m_cv.notify_one();
                std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
            }
            m_bFinished = true;
        }
    
        void Consumer() 
        {
            do
            {
                std::unique_lock<std::mutex> lock(m_Mutex);
                while (m_data.empty())
                {
                    m_cv.wait(lock, [&](){ return !m_data.empty(); }); // predicate an while loop - protection from spurious wakeups
                }
                while(!m_data.empty()) // consume all elements from queue
                {
                    std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
                    m_data.pop();
                }
            } while(!m_bFinished);
        }
    };
    
    
    int main()
    {
        int QueueElements = 10;
        App app(QueueElements);
        std::thread consumer_thread(&App::Consumer, &app);
        std::thread producer_thread(&App::Producer, &app);
    
        producer_thread.join();
        consumer_thread.join();
    
        std::cout << "loop exited" << std::endl;
        return 0;
    }
    

    另外注意,当你处理并发线程时,最好使用atomic作为结束标志,因为理论上m_bFinished的值将存储在缓存行中,如果没有缓存失效生产者线程,从消费者线程看不到更改的值。 Atomics 具有内存栅栏,可以保证该值将为其他线程更新。

    您也可以查看memory_order 页面。

    【讨论】:

    • 感谢您的回答,sleep的目的纯粹是为了模拟不同的处理延迟,不会成为最终代码的一部分。
    【解决方案2】:

    首先,您应该使用条件变量而不是消费者的延迟。这样,消费者线程只有在队列不为空并由生产者通知时才会唤醒。

    也就是说,您的生产者调用更频繁的原因是生产者线程的延迟。它是在持有互斥锁的同时执行的,因此消费者在延迟结束之前永远不会执行。你应该在调用sleep_for之前释放互斥锁:

    for (int i = 0; i < m_QueueSize; ++i) {
                /* Introduce a scope to release the mutex before sleeping*/
                {
                   std::lock_guard<std::mutex> guard(m_Mutex);
                    m_data.push(i); 
                    std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
                } // Mutex is released here
                std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
            }
    

    【讨论】:

      猜你喜欢
      • 2018-09-24
      • 2017-02-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-11-17
      • 1970-01-01
      相关资源
      最近更新 更多