【问题标题】:concurrent_queue memory consumption exploded, then program crashedconcurrent_queue 内存消耗爆炸,然后程序崩溃
【发布时间】:2013-07-23 05:56:45
【问题描述】:

这是使用VS 2010并发队列的典型生产者/消费者模式,问题是当我运行程序时,内存消耗超过1GB然后程序崩溃,有人可以指出这段代码中的问题吗?

#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib> 
#include <ctime> 

#include <boost\shared_ptr.hpp>
#include <boost\thread.hpp>
#include <concurrent_queue.h>



void wait2(int milliseconds)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
}

class CQueue
{
    Concurrency::concurrent_queue<int>  Q;

    boost::mutex                m;
    boost::condition_variable   cv;

public:

    CQueue():QValue(-1)
    {
    }

    int QRead()
    {
        while(Q.empty())
        {
            boost::unique_lock<boost::mutex> lk(m);
            cv.wait(lk);
        }

        int res;
        if(Q.try_pop(res))
        {
            QValue = res;
            return true;
        }
        return false;
    }

    void QWrite(int i)
    {
        Q.push(i);
        cv.notify_one();
    }

    int QValue;
};

CQueue myqueue;

void write()
{
    int i = 0;
    while(true)
    {
        myqueue.QWrite(++i);
    }
}


void read()
{
    while(true)
    {
        if( myqueue.QRead())
            std::cout << myqueue.QValue << std::endl;
        else
            std::cout << "failed to read" << std::endl;
    }
}
void main ()
{

    boost::thread w(write);
    boost::thread r(read);

    w.join();
    r.join();

}

【问题讨论】:

  • 没有明显的死锁。可能由于读取器正在执行 I/O,写入器填充队列的速度比读取器消耗队列的速度要快得多。
  • 你为什么把unique_lock放在一个范围内,你知道你会在锁关闭的时候从队列中读取,对吧?既然已经有了 concurrent_queue,为什么还要做这些事情?
  • @Casey 你应该是正确的,因为通常生产者/消费者模式需要指定每个消费者的容量(他们的输入 fifo 的阈值),以便阻止生产者,直到有人准备好或有存储空间。
  • @Casey,该程序在大约 77k 次插入时失败,这并不大,所以我不认为这是个问题。
  • @yngum、unique_lock 和 condition_variable 是在队列为空时挂起线程以避免繁忙 cpu 所必需的。

标签: c++ multithreading boost queue


【解决方案1】:

我在一个简单的双核上使用 VS'13 和 Boost 1.52 构建并测试了您的代码。

正如已经说过的,由于您的生产者-消费者设计没有定义一个阈值来在库存(并发队列)达到给定水平时阻止生产者,生产者在队列中推送了太多数据,因此内存增加,窗口开始交换,冻结,如果超过最大提交大小,进程可能会崩溃等等......

请注意,提交大小限制取决于几个因素,编译器、编译器选项、程序运行的操作系统……

所以在下面我添加了一种方法,如果队列大小达到阈值,则阻塞生产者,如果队列大小低于阈值,则消费者唤醒生产者。

通过这些更改,我们添加了一些同步,这可能会限制并行性,但使用中的内存是受控的。

#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib> 
#include <ctime> 

#include "..\..\..\boost\boost\shared_ptr.hpp"
#include "..\..\..\boost\boost\thread.hpp"

#include <concurrent_queue.h>

#define STOCK_THRESHOLD 1000

void wait2(int milliseconds)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
}

class CQueue
{
    Concurrency::concurrent_queue<int>  Q;

    boost::mutex                consumerMutex;
    boost::condition_variable   consumerCV;

    boost::mutex                producerMutex;
    boost::condition_variable   producerCV;

public:

    CQueue():QValue(-1)
    {
    }

    int QRead()
    {
        while(Q.empty())
        {
            boost::unique_lock<boost::mutex> lk(consumerMutex);
            consumerCV.wait(lk);
        }

        int res;
        if(Q.try_pop(res))
        {
            QValue = res;
            if(Q.unsafe_size() <= STOCK_THRESHOLD)
            {
                producerCV.notify_one();
            }
            return true;
        }
        return false;
    }

    void QWrite(int i)
    {
        while(Q.unsafe_size() > STOCK_THRESHOLD){
            boost::unique_lock<boost::mutex> lk(producerMutex);
            producerCV.wait_for(lk, boost::chrono::milliseconds(10));
        }
        Q.push(i);
        consumerCV.notify_one();
    }

    int QValue;
};

CQueue myqueue;

void write()
{
    int i = 0;
    while(true)
    {
        myqueue.QWrite(++i);

    }
}


void read()
{
    while(true)
    {
        if( myqueue.QRead())
            std::cout << myqueue.QValue << std::endl;
        else
            std::cout << "failed to read" << std::endl;
    }
}

void main ()
{

    boost::thread w(write);
    boost::thread r(read);

    w.join();
    r.join();

}

【讨论】:

    【解决方案2】:

    代码丢失了来自您的条件变量的通知,因此您的消费者线程睡眠时间过长,因此消耗的速度不够快。

    想象一下可以想象的线程序列:

        Producer                       Consumer
    --+-----------------------------+-------------------------------------------------------
    1 |                             |  while(Q.empty())
    2 |   Q.push(i);                |  boost::unique_lock<boost::mutex> lk(consumerMutex);
    3 |   consumerCV.notify_one();  |
    4 |                             |  consumerCV.wait(lk); // notification from 3 gets lost
    

    要修复互斥锁,必须在 consumerCV.notify_one() 之前在生产者中发出条件信号并在 Q.empty() 之前检查消费者中的队列状态时保持互斥锁。

    您可以通过注释掉所有互斥锁和条件变量调用并将使用者更改为忙等待(如while(Q.empty()) /* busy-wait */;)来轻松检查。

    如果concurrent_queue 不提供等待项目可用的功能,则最好使用包裹在互斥锁中的非线程安全容器。因为它仍然需要一个互斥锁和一个条件变量来正确通知使用无锁或无等待容器获得的好处。

    另外,由于生产者只使用++i 进行生产,而消费者通过打印每个值来完成更多工作,消费者可能无法跟上生产者的速度,从而导致队列增加并最终耗尽内存.

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-03-23
      • 1970-01-01
      • 1970-01-01
      • 2020-04-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多