【问题标题】:fine-grained locking queue in c++C ++中的细粒度锁定队列
【发布时间】:2013-08-01 03:01:27
【问题描述】:

这是 Anthony Williams 在第 6.2.3 章 C++ Concurrency in Action 中介绍的细粒度锁定队列。

/*
    pop only need lock head_mutex and a small section of tail_mutex,push only need
    tail_mutex mutex.maximum container concurrency.
*/
template<typename T> class threadsafe_queue
{
    private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    }
    std::mutex head_mutex;   //when change the head lock it.
    std::unique_ptr<node> head;  
    std::mutex tail_mutex;   //when change the tail lock it.
    node* tail;
    std::condition_variable data_cond;

    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }

    public:
    /* 
        create a dummy node
    */
    threadsafe_queue():
        head(new node),tail(head.get())
    {}

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> head_lock;
        data_cond.wait(head_lock,[&]{return head.get()!=get_tail();}); //#1
        std::unique_ptr<node> old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }

    void push(T new_value)
    {
        std::shared_ptr<T> new_data(
        std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data=new_data;
            node* const new_tail=p.get();
            tail->next=std::move(p);
            tail=new_tail;
        }
        data_cond.notify_one();
    }
}

情况如下:有两个线程(thread1thread2)。 thread1 正在执行 wait_and_popthread2 正在执行 push。队列为空。

thread1 在#2 中,在data_cond.wait() 之前已经检查过head.get()!=get_tail()。此时它的 CPU 周期已经用完。 thread2 开始。

thread2 完成了push 函数并执行了data_cond.notify_one()thread1 又开始了。

现在thread1 开始于data_cond.wait(),但它一直在等待。

这种情况会不会发生?如果会,如何修复这个容器?

【问题讨论】:

    标签: c++ multithreading concurrency stl


    【解决方案1】:

    是的,OP 中描述的情况是可能的,并且会导致通知丢失。在谓词函数中注入一个不错的大时间延迟可以很容易地触发。 Here's a demonstration at Coliru。请注意程序如何完成需要 10 秒(超时长度为wait_for)而不是 100 毫秒(生产者在队列中插入项目的时间)。通知丢失。

    在条件变量的设计中有一个隐含的假设,即当关联的互斥锁被锁定时,条件的状态(谓词的返回值)不能改变。对于此队列实现而言并非如此,因为 push 可以在不持有 head_mutex 的情况下更改队列的“空”。

    §30.5p3 指定 wait 具有三个原子部分:

    1. 互斥体释放,进入等待状态;
    2. 等待解除阻塞;和
    3. 重新获得锁。

    请注意,这些都没有提到对谓词的检查,如果有的话被传递给waitwait 带有谓词的行为在 §30.5.1p15 中有描述:

    效果:

    while (!pred())
          等待(锁定);

    请注意,这里不能保证谓词检查和wait 是原子执行的。 有一个前提条件是lock 被锁定,并且它是由调用线程持有的关联互斥体。

    至于修复容器以避免丢失通知,我会将其更改为单个互斥体实现并完成它。当 pushpop 最终都采用相同的互斥锁 (tail_mutex) 时,将其称为细粒度锁定有点牵强。

    【讨论】:

    • 是的,书中的代码有bug。很好的解释。
    【解决方案2】:

    data_cond.wait() 每次唤醒时都会检查条件。所以即使它可能已经被检查过了,它会在data_cond.notify_one()之后再次被检查。此时,有数据要弹出(因为线程 2 刚刚完成推送),所以它返回。阅读更多here

    您应该担心的唯一一件事是当您在一个空队列上调用 wait_and_pop 并且不再向其推送任何数据时。此代码没有超时等待和返回错误(或引发异常)的机制。

    【讨论】:

    • 我认为 data_cond.wait() 不是原子函数。看起来像 while(flag!=true) condition.wait().if thread1 有检查标志,但没有 condition.wait ().then thread2 begin,finished with a notify.thread1再次开始,陷入等待。在这种情况下会发生什么。
    • 请注意,等待谓词使用get_tail(),它获取tail_mutex 上的锁。没有竞争条件,因为谓词将在推送操作进行时阻塞。锁很聪明,如果一个被另一个阻塞,它的线程会在锁被释放时被唤醒。
    • 但是如果队列为空,thread1 已经完成 head.get()!=get_tail() 将等待条件变量,然后 thread2 开始 push 并在 thread1 执行等待之前完成。然后 thread1 一直等待,尽管队列不为空
    • 啊,现在我明白你在问什么了。如果发生这种情况,那么我希望等待应该立即返回,因为 data_cond 对象处于信号状态。但是,我不确定这一点。我建议您设置一个简单的测试场景,其中一个线程在另一个线程等待之前通知condition_variable
    • @joeymiu 我做了一些挖掘工作,发现了this old article about boost condition variablesthis StackOverflow question。据我所知,condition_variable 保证在谓词测试和解锁等待阶段之间是原子的,因此不可能错过通知。是的,通知只唤醒正在等待的线程,而不是尚未等待的线程。总之,您无需担心数据竞争。
    猜你喜欢
    • 1970-01-01
    • 2012-02-20
    • 1970-01-01
    • 2015-02-02
    • 1970-01-01
    • 1970-01-01
    • 2023-04-11
    • 1970-01-01
    • 2014-12-25
    相关资源
    最近更新 更多