【问题标题】:Lock-Free Queue with boost::atomic - Am I doing this right?带有 boost::atomic 的无锁队列 - 我这样做对吗?
【发布时间】:2012-03-16 01:40:14
【问题描述】:

短版:

我正在尝试替换来自 here 的无锁、单一生产者、单一消费者队列实现中使用的 C++11 中的 std::atomic。如何将其替换为 boost::atomic

加长版:

我正在尝试通过工作线程从我们的应用中获得更好的性能。每个线程都有自己的任务队列。我们必须在每个任务出队/入队之前使用锁进行同步。

然后我找到了 Herb Sutter 关于无锁队列的文章。这似乎是一个理想的替代品。但是代码使用了来自 C++11 的std::atomic,我目前无法将其引入项目中。

更多的谷歌搜索导致了一些例子,例如this one for Linux (echelon's)this one for Windows (TINESWARE's)。两者都使用平台的特定构造,例如 WinAPI 的 InterlockedExchangePointer 和 GCC 的 __sync_lock_test_and_set

我只需要支持 Windows 和 Linux,所以也许我可以摆脱一些 #ifdefs。但我认为使用boost::atomic 提供的内容可能会更好。 Boost Atomic 还不是官方 Boost 库的一部分。所以我从http://www.chaoticmind.net/~hcb/projects/boost.atomic/ 下载了源代码,并在我的项目中使用了包含文件。

这是我目前得到的:

#pragma once

#include <boost/atomic.hpp>

template <typename T>
class LockFreeQueue
{
private:
    struct Node
    {
        Node(T val) : value(val), next(NULL) { }
        T value;
        Node* next;
    };
    Node* first; // for producer only
    boost::atomic<Node*> divider;  // shared
    boost::atomic<Node*> last; // shared

public:
    LockFreeQueue()
    {
        first = new Node(T());
        divider = first;
        last= first;
    }

    ~LockFreeQueue()
    {
        while(first != NULL) // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }

    void Produce(const T& t)
    {
        last.load()->next = new Node(t); // add the new item
        last = last.load()->next;
        while(first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }

    bool Consume(T& result)
    {
        if(divider != last) // if queue is nonempty
        {
            result = divider.load()->next->value; // C: copy it back
            divider = divider.load()->next;
            return true;  // and report success
        }
        return false;  // else report empty
    }
};

需要注意的一些修改:

boost::atomic<Node*> divider;  // shared
boost::atomic<Node*> last; // shared

    last.load()->next = new Node(t); // add the new item
    last = last.load()->next;

        result = divider.load()->next->value; // C: copy it back
        divider = divider.load()->next;

我是否在这里正确地应用了来自 boost::atomic 的 load()(和隐式 store())?我们可以说这相当于 Sutter 最初的 C++11 无锁队列吗?

PS。我研究了 SO 上的许多线程,但似乎没有一个提供 boost::atomic 和无锁队列的示例。

【问题讨论】:

  • 在您开始探索无锁马蜂窝之前,请考虑为您的工作线程提供更大的有效负载,以便花在互斥争用上的时间变得不那么重要。例如,如果您正在处理数据流,请将数据收集成大块,然后再将其传递给工作线程。
  • FWIF,请注意,Lockfree 库已被 Boost 接受:lists.boost.org/boost-announce/2011/08/0331.phptim.klingt.org/boost_lockfree
  • @EmileCormier 我会处理这个,谢谢。
  • @m3rLinEz 你的代码使用成功了吗?
  • 正如 Igor R 提到的,无锁队列已经在 boost 中可用。可以直接使用。

标签: c++ multithreading boost atomic lock-free


【解决方案1】:

你试过Intel Thread Building Blocks'atomic&lt;T&gt;吗?跨平台且免费。

还有……

单一生产者/单一消费者使您的问题变得更容易,因为您的线性化点可以是单个运算符。如果您准备好接受有界队列,它会变得更容易。

有界队列提供缓存性能优势,因为您可以保留缓存对齐的内存块以最大化您的命中,例如:

#include <vector>
#include "tbb/atomic.h"
#include "tbb/cache_aligned_allocator.h"    

template< typename T >
class SingleProdcuerSingleConsumerBoundedQueue { 
    typedef vector<T, cache_aligned_allocator<T> > queue_type;

public:
    BoundedQueue(int capacity):
        queue(queue_type()) {
        head = 0;
        tail = 0;
        queue.reserve(capacity);
    }

    size_t capacity() {
        return queue.capacity();
    }

    bool try_pop(T& result) {
        if(tail - head == 0)
            return false;
        else {
            result = queue[head % queue.capacity()];
            head.fetch_and_increment(); //linearization point
            return(true);
        }
    }

    bool try_push(const T& source) {
        if(tail - head == queue.capacity()) 
            return(false);
        else {
            queue[tail %  queue.capacity()] = source;
            tail.fetch_and_increment(); //linearization point
            return(true);
        }
    }

    ~BoundedQueue() {}

private:
    queue_type queue;
    atomic<int> head;
    atomic<int> tail;
};

【讨论】:

  • 确实如此,并且 concurrent_bounded_queue 模板类将是一个可爱的解决方案,但我认为 OP 对优化的解决方案感兴趣,因此我仍然提出上述建议
  • tail % queue.capacity() 在 tail 翻转后不会产生负面结果吗?埃文,如果您切换到无符号整数,容量是否必须是 MAX_INT 的除数?
【解决方案2】:

从文档中查看boost.atomic ringbuffer example

#include <boost/atomic.hpp>

template <typename T, size_t Size>
class ringbuffer
{
public:
    ringbuffer() : head_(0), tail_(0) {}

    bool push(const T & value)
    {
        size_t head = head_.load(boost::memory_order_relaxed);
        size_t next_head = next(head);
        if (next_head == tail_.load(boost::memory_order_acquire))
            return false;
        ring_[head] = value;
        head_.store(next_head, boost::memory_order_release);
        return true;
    }

    bool pop(T & value)
    {
        size_t tail = tail_.load(boost::memory_order_relaxed);
        if (tail == head_.load(boost::memory_order_acquire))
            return false;
        value = ring_[tail];
        tail_.store(next(tail), boost::memory_order_release);
        return true;
    }

private:
    size_t next(size_t current)
    {
        return (current + 1) % Size;
    }

    T ring_[Size];
    boost::atomic<size_t> head_, tail_;
};

// How to use    
int main()
{
    ringbuffer<int, 32> r;

    // try to insert an element
    if (r.push(42)) { /* succeeded */ }
    else { /* buffer full */ }

    // try to retrieve an element
    int value;
    if (r.pop(value)) { /* succeeded */ }
    else { /* buffer empty */ }
}

代码的唯一限制是缓冲区长度必须在编译时知道(或者在构造时,如果您将数组替换为std::vector&lt;T&gt;)。据我了解,让缓冲区增长和缩小并非易事。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-04-21
    • 2012-08-13
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 2013-04-22
    • 2015-01-13
    • 2014-02-19
    相关资源
    最近更新 更多