你说你学过操作系统这门课?写个无Bug的生产者和消费者模型试试!

                              ——你真的学好了操作系统这门课嘛?

 

在第壹章,展示过这样图:

从零开始山寨Caffe·陆:IO系统(一)

其中,左半部分构成了新版Caffe最恼人、最庞大的IO系统。

也是历来最不重视的一部分。

第伍章又对左半部分的独立性进行了分析,我是这么描述到:

Datum和Blob(Batch)不是上下文相关的。

Blob包含着正向传播的shape信息,这些信息只有初始化网络在初始化时才能确定。

而Datum则只是与输入样本有关。

所以,Datum的读取工作可以在网络未初始化之前就开始,这就是DataReader采用线程设计的内涵。

所以,左半部分又可以分为左左半部分,和左右半部分。

阻塞队列

生产者与消费者

第伍章讲到,在一个机器学习系统中,生产者和消费者的执行周期是不一样的。

为了平衡在周期上的差异,节约计算资源,我们显然需要对生产者做一定限制。

存储生产资源,可以用数组,也可以用STL容器。

再考虑生产者和消费者的行为:

①不存在随机访问:

显然,消费者是按照固定顺序访问缓冲区的。

我们没有必要考虑随机访问的情况。

②不存在随机写入:

显然,生产者每次只需要将资源放置于缓冲区两端。

我们没有必要考虑在线性表中间位置写入的情况。

由于vector底层由顺序表实现,其访问速度随着元素数量的递增而递减,

而queue底层由链式表实现,其访问速度不随元素数量的递增而递减,且没有随机写入/访问的情况。

所以,选择queue作为缓冲区是比较优异的。

为了限制生产者的行为,我们需要在STL提供的queue基础上,改进出一种新的数据结构——Blocking Queue。

互斥锁

第肆章简单提到了mutex问题,这是阻塞队列除了Blocking之外,需要考虑的第二大问题。

并且已经证明了:生产者和消费者之间必然是异步的。

我们以队列的push和pop操作为例,分析一下,为什么在多线程情况下,需要加mutex。

假设线程A预备执行push操作,所以它是一个生产者;

假设线程B预备执行pop操作,所以它是一个消费者;

设有临界缓冲区队列Q,在某时刻T,线程A发出push操作,在T+1时候,线程B发出pop操作,

且push需要10个单位时间,pop只需要一个单位时间,问T+2时刻,pop出去的资源你敢用嘛?

显然,没人敢用这个执行push的半成品。

发生上述问题的症结在于,两个异步线程对于同一个资源,产生了争夺行为。

解决方案就是:在push时,锁住资源,禁止pop;在pop时,锁住资源,禁止push。

广义上,我们可以认为,需要将push和pop函数变成原子函数,即:执行期间不可中断的函数。

———————————————————————————————————————————————————————————

另外,需要注意的是,mutex与blocking是两个概念。

在广义上,mutex会将多个线程对同一个资源的异步并行操作,拉成一个串行执行队列,串行等待执行。

而blocking则是将线程休眠,CPU会暂时放弃对其控制。

在程序员界,虽然有时候会把mutex和blocking都称为阻塞,但其原理和内涵是完全不同的。

———————————————————————————————————————————————————————————

boost提供不俗的mutex功能,使用前需要 #include "boost/thread/mutex.hpp"

你可以将一个boost::mutex对象嵌入到一个类当中,这样,允许每一个类对象拥有一把锁。

由于对一个queue对象,主要是锁住来自该对象的push和pop操作,

所以,mutex理应当是以类对象为一个单位的,参考代码如下:

template <typename T>
class BlockingQueue{
public:
    void push(const T& t){
        boost::mutex::scoped_lock lock(mutex);
        Q.push(t);
    }
    T pop(){
        boost::mutex::scoped_lock lock(mutex);
        T t = Q.front();
       Q.pop();
    return t;
    }
private:
    boost::mutex mutex;
    queue<T> Q;
};    

boost::mutex::scoped_lock lock提供局部锁定功能。

它与boost::scoped_ptr有类似的效果,scoped_ptr在作用域结束后,就立即释放对象。

scoped_lock在作用域结束后,会立即解锁,如果不用scoped_lock,我们可以这么写:

void push(const T& t){
        mutex.lock();
        Q.push(t);
        mutex.unlock();
}

条件阻塞与激活

前面几章说了那么久的阻塞,其中大部分指的应该是blocking。

mutex大部分情况下,都只是在锁一个局部函数,阻塞周期非常短。

唯一的例外是Layer的正向传播函数forward,mutex锁住的周期非常长。

blocking和mutex的唯一不同在于:

blocking之后,操作系统会唆使CPU放弃对线程的处理。

这是非常危险的一个行为,因为该线程被家长赶去睡觉了,而且不能反抗家长的命令。

除非家长通知它:噢,你可以活动了。在此之前,该线程将永远处于无效状态。

上面的例子有两个重点:

①CPU放弃线程

②不可主动激活

既然如此,为了激活这个线程,模型就必须设计成“对偶模型”,而生产者和消费者,恰恰正是对偶的。

———————————————————————————————————————————————————————————

boost::condition_variable提供了简单的blocking功能,为了统一控制,可以将其与mutex捆在一起:

template <typename T>
class BlockingQueue
{
public:
    class Sync{
    public:
        boost::mutex mutex;
        boost::condition_variable condition;
    };
private:
    queue<T> Q;
    boost::shared_ptr<Sync> sync;
};

现在考虑一下,何时需要注销、阻塞一个线程,大致有两种情况:

①缓冲区空,此时消费者不能消费,拒绝pop操作之后,可以交出CPU控制权。

②缓冲区满,此时生产者不能生产,拒绝push操作之后,可以交出CPU控制权。

为了激活彼此,就需要模型是对偶的:

①经历缓冲区空之后,突然push了一个元素,此时应当由生产者激活消费者线程。

②经历缓冲区满之后,突然pop了一个元素,此时应当由消费者激活生产者线程。

看起来,我们可以将代码写成这样:

void BlockingQueue<T>::push(const T& t){
    boost::mutex::scoped_lock lock(sync->mutex);
    while (Q.full()){
        sync->condition.wait(lock); //suspend, spare CPU clock
    }
    Q.push(t);
    sync->condition.notify_one();
}

template<typename T>
T BlockingQueue<T>::pop(const string& log_waiting_msg){
    boost::mutex::scoped_lock lock(sync->mutex);
    while (Q.empty()){
        sync->condition.wait(lock); //suspend, spare CPU clock
    }
    T t = Q.front();
    Q.pop();
    sync->condition.notify_one();
    return t;
}

其中,sync->condition.wait(lock)表示使用当前mutex为标记,交出CPU控制权。

sync->condition.notify_one()则表示激活一个线程的CPU控制权。

可以看到,blocking和activating的代码是完全对偶的,blocking自己,activating对方。

双阻塞队列

上节代码是不可能实现的,因为没有Q.full()这个函数。

在传统生产者、消费者程序中,通常会使用单缓冲队列。

使用单缓冲队列是没有问题的,因为在这种简单的代码结构中,我们很容易知道缓冲队列的上界。

比如,设定缓冲队列大小为20,在编程中,可以通过检测 if(count==20)来达到。

当代码结构复杂时,比如,缓冲队列大小变量通常在非常上层上层上层的位置,而处于底层的缓冲队列,

是无法探知何谓“缓冲队列满”的含义的,这就为编程带来很大的难题。

———————————————————————————————————————————————————————————

解决方案是,使用双缓冲队列组方案,我们设定两个阻塞队列,一个叫free,一个叫full。

两者组成一个QueuePair:

class QueuePair{
public:
    QueuePair(const int size);
    ~QueuePair();
    BlockingQueue<Datum*> free; // as producter queue
    BlockingQueue<Datum*> full; // as consumer queue
};

为了避免检测缓冲队列的上界,我们可以先放置与上界数量等量的空元素指针到free队列。

每次生产者生产时,从free队列中pop一个空Datum元素,填充,再扔进full队列。

这样,BlockingQueue的push操作就不需要检测上界了。

原理很简单,生产者想要push,之前必须pop,pop可以通过检测缓冲队列空来实现。

这样,就用检测一个缓冲队列的空,模拟且替代了检测另一个缓冲队列的满。

对于上层代码而言,我们仅仅需要预先填充N个元素至free队列中即可,非常方便。

这部分是DataReader的设计核心。

代码实战

★数据结构

———————————————————————————————————————————————————————————

建立blocking_queue.hpp。

template <typename T>
class BlockingQueue
{
public:
    BlockingQueue();
    void push(const T& t); 
    T pop(const string& log_waiting_msg="");
    T peek();
    size_t size();
    // try_func return false when need blocking
    // try_func for destructor
    bool try_pop(T* t);
    bool try_peek(T* t);
    class Sync{
    public:
        boost::mutex mutex;
        boost::condition_variable condition;
    };
private:
    queue<T> Q;
    boost::shared_ptr<Sync> sync;
};
★class BlockingQueue

相关文章:

  • 2021-09-08
  • 2021-04-17
  • 2022-01-27
  • 2021-07-21
  • 2021-07-16
  • 2021-09-17
猜你喜欢
  • 2021-06-21
  • 2022-02-19
  • 2021-12-10
  • 2022-01-13
  • 2021-07-10
  • 2022-02-26
相关资源
相似解决方案