【问题标题】:C++ Lock free producer/consumer queueC++无锁生产者/消费者队列
【发布时间】:2011-10-26 23:02:35
【问题描述】:

我正在查看无锁队列的示例代码:

http://drdobbs.com/high-performance-computing/210604448?pgno=2

(在许多 SO 问题中也有参考,例如Is there a production ready lock-free queue or hash implementation in C++

这看起来应该适用于单个生产者/消费者,尽管代码中有许多拼写错误。我已将代码更新为如下所示,但它让我崩溃了。有人有什么建议吗?

特别是,应该将divider 和last 声明为:

atomic<Node *> divider, last;    // shared

我在这台机器上没有支持 C++0x 的编译器,所以也许这就是我所需要的...

// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.

template <typename T>
class LockFreeQueue {
private:
    struct Node {
        Node( T val ) : value(val), next(0) { }
        T value;
        Node* next;
    };
    Node *first,      // for producer only
    *divider, *last;    // shared
public:
    LockFreeQueue()
    {
        first = divider = last = new Node(T()); // add dummy separator
    }
    ~LockFreeQueue()
    {
        while( first != 0 )    // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }
    void Produce( const T& t )
    {
        last->next = new Node(t);    // add the new item
        last = last->next;      // publish it

        while (first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }
    bool Consume( T& result )
    {
        if (divider != last)         // if queue is nonempty
        {
            result = divider->next->value; // C: copy it back
            divider = divider->next;      // D: publish that we took it
            return true;                  // and report success
        }
        return false;                   // else report empty
    }
};

我编写了以下代码来测试这一点。 Main(未显示)只是调用 TestQ()。

#include "LockFreeQueue.h"

const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);

void *Solver(void *whichID)
{
    int id = (long)whichID;
    printf("Thread %d initialized\n", id);
    int result = 0;
    do {
        if (q[id].Consume(result))
        {
            int y = 0;
            for (int x = 0; x < result; x++)
            { y++; }
            y = 0;
        }
    } while (result != -1);
    return 0;
}


void TestQ()
{
    std::vector<pthread_t> threads;
    for (int x = 0; x < numThreads; x++)
    {
        pthread_t thread;
        pthread_create(&thread, NULL, Solver, (void *)x);
        threads.push_back(thread);
    }
    for (int y = 0; y < 1000000; y++)
    {
        for (unsigned int x = 0; x < threads.size(); x++)
        {
            q[x].Produce(y);
        }
    }
    for (unsigned int x = 0; x < threads.size(); x++)
    {
        q[x].Produce(-1);
    }
    for (unsigned int x = 0; x < threads.size(); x++)
        pthread_join(threads[x], 0);    
}

更新:最终崩溃是由队列声明引起的:

std::vector<LockFreeQueue<int> > q(numThreads);

当我将它更改为一个简单的数组时,它运行良好。 (我实现了一个带锁的版本,它也崩溃了。)我看到析构函数在构造函数之后立即被调用,导致内存双重释放。但是,有谁知道为什么会立即使用 std::vector 调用析构函数?

【问题讨论】:

  • @OliCharlesworth "malloc: *** error for object 0x100100a20: pointer being free was not assigned" on line "delete tmp;"
  • 另一种实现 single reader_single_ writer 队列的简单方法是使用 2 个队列和一个原子“交换缓冲区”:队列 @9​​87654327@ 用于读取,队列 @ 987654328@ 用于写入,“交换缓冲区”S 最初设置为 null。用法:生产者写完后:交换W &lt;-&gt; S。消费者像往常一样从R读取,如果R用尽则交换R &lt;-&gt; S;交换后尝试再读一次。最后,如果读取器(或写入器)遇到一个 nullptr before 读取(或写入)然后交换R &lt;-&gt; S(或W &lt;-&gt; S)...
  • ... 优点是简单;缺点是如果在读取之间没有写入任何内容,消费者可能会反复交换空队列。在我的实现中,我实现了一个 listen() 方法,该方法会阻止阅读器,直到作者通知它为止。

标签: c++ lock-free


【解决方案1】:

您需要创建几个指针 std::atomic,正如您所注意到的,并且您需要在循环中使用 compare_exchange_weak 以原子方式更新它们。否则,多个消费者可能会消费同一个节点,并且多个生产者可能会破坏列表。

【讨论】:

  • 我只对单一生产者/消费者案例感兴趣,但我必须在另一台机器上测试原子代码以验证它是否有效。
【解决方案2】:

这些写入(只是代码中的一个示例)按顺序发生非常重要:

last->next = new Node(t);    // add the new item
last = last->next;      // publish it

这不是 C++ 所保证的——优化器可以按照自己的喜好重新排列事物,只要当前线程总是表现得好像程序完全按照您编写的方式运行。然后 CPU 缓存可以出现并进一步重新排序。

你需要内存栅栏。让指针使用原子类型应该会有这样的效果。

【讨论】:

  • 为什么编译器会重新排序这些?如,编译器如何证明重新排序的合理性?
  • @user1715122:C++ 内存模型证明了这一点。在此之前,它是通过顺序一致性来证明的。就编译器而言,对非易失性内存的更改是仅对当前线程可见的副作用。所以它可以写temp1 = new Node(t); temp2 = last; last = temp1; temp2-&gt;next = temp1;,即使编译器不写,CPU 本身也可以乱序获取和释放不同缓存行的所有权,只要它在实际发生写入时拥有缓存行。
  • 或者编译器可以有一个缓存last的寄存器,并且只读取和写入该寄存器而根本不接触内存位置。任何优化都由 as-if 规则证明,除非它改变了可见的副作用。这里没有可见的副作用。
【解决方案3】:

这可能完全不合时宜,但我不禁想知道您是否遇到了某种与静态初始化相关的问题...为了笑声,请尝试将 q 声明为 指针 em> 到一个无锁队列向量,并将其分配到main() 中的堆上。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-02-11
    • 2014-10-31
    • 2012-01-12
    • 2012-01-28
    • 2011-08-30
    • 2016-08-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多