【问题标题】:Atomic operations for lock-free doubly linked list无锁双向链表的原子操作
【发布时间】:2013-10-26 17:04:44
【问题描述】:

我正在根据这些论文写一个无锁双向链表:

《基于引用计数的高效可靠的无锁内存回收》 Anders Gidenstam,成员,IEEE,Marina Papatriantafilou,H˚ akan Sundell 和 Philippas Tsigas

“无锁双端队列和双向链表” Håkan Sundell, Philippas Tsigas

对于这个问题,我们可以先把第一篇论文放在一边。

在本文中,他们使用一种智能的方式将删除标志和指针存储在一个单词中。 (更多信息here

论文中这部分的伪代码:

union Link
    : word
    (p,d): {pointer to Node, boolean} 

structure Node
    value: pointer to word
    prev: union Link
    next: union Link

上面的伪代码是我的代码:

template< typename NodeT >
struct LockFreeLink
{
public:
    typedef NodeT NodeType;

private:

protected:
    std::atomic< NodeT* > mPointer;

public:
    bcLockFreeLink()
    {
        std::atomic_init(&mPointer, nullptr);
    }
    ~bcLockFreeLink() {}

    inline NodeType* getNode() const throw()
    {
        return std::atomic_load(&mPointer, std::memory_order_relaxed);
    }
    inline std::atomic< NodeT* >* getAtomicNode() const throw()
    {
        return &mPointer;
    }
};

struct Node : public LockFreeNode
{
    struct Link : protected LockFreeLink< Node >
    {
        static const int dMask = 1;
        static const int ptrMask = ~dMask;

        Link() { } throw()
        Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()
        { 
            std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
        }

        Node* pointer() const throw() 
        { 
            return reinterpret_cast<Node*>(
                std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
        }
        bool del() const throw() 
        { 
            return std::atomic_load(&data, std::memory_order_relaxed) & dMask; 
        }
        bool compareAndSwap(const Link& pExpected, const Link& pNew) throw() 
        { 
            Node* lExpected = std::atomic_load(&pExpected.mPointer, std::memory_order_relaxed);
            Node* lNew = std::atomic_load(&pNew.mPointer, std::memory_order_relaxed);

            return std::atomic_compare_exchange_strong_explicit(
                &mPointer,
                &lExpected,
                lNew,
                std::memory_order_relaxed,
                std::memory_order_relaxed); 
        }

        bool operator==(const Link& pOther) throw() 
        { 
            return std::atomic_load(data, std::memory_order_relaxed) == 
                std::atomic_load(pOther.data, std::memory_order_relaxed); 
        }
        bool operator!=(const Link& pOther) throw() 
        { 
            return !operator==(pOther); 
        }
    };

    Link mPrev;
    Link mNext;
    Type mData;

    Node() {};
    Node(const Type& pValue) : mData(pValue) {};
};

本文有设置链接删除标记为真的功能:

procedure SetMark(link: pointer to pointer to Node)
    while true do
       node = *link;
       if node.d = true or CAS(link, node, (node.p, true)) then break;

我的这个函数的代码:

void _setMark(Link* pLink)
{
    while (bcTRUE)
    {
        Link lOld = *pLink;
        if(pLink->del() || pLink->compareAndSwap(lOld, Link(pLink->pointer(), bcTRUE)))
            break;
    }
}

但我的问题是在compareAndSwap 函数中,我必须比较和交换三个原子变量。有关问题的信息是here

(实际上new比较和交换函数中的变量并不重要,因为它是线程本地的)

现在我的问题是:如何编写 compareAndSwap 函数来比较和交换三个原子变量,或者我在哪里出错?

(请原谅我的问题很长)

编辑:

类似的问题在内存管理器论文中:

function CompareAndSwapRef(link:pointer to pointer toNode,
old:pointer toNode, new:pointer toNode):boolean
    if CAS(link,old,new) then
        if new=NULL then
            FAA(&new.mmref,1);
            new.mmtrace:=false;
    if old=NULLthen FAA(&old.mmref,-1);
    return true;
return false; 

这里我必须再次比较和交换三个原子变量。 (请注意,我的参数是 Link 的类型,我必须比较和交换 mPointerLink

【问题讨论】:

    标签: c++ multithreading c++11 atomic lock-free


    【解决方案1】:

    除非您可以使您正在比较/交换的三个数据项适合两个指针大小的元素,否则您不能通过比较和交换来做到这一点(当然不是在 x86 上,而且我还没有听说过任何其他的具有这种东西的机器架构)。

    如果您依赖存储在(至少)与偶数字节地址对齐的地址上的数据,您可以在删除元素时使用按位或设置最低位。过去,人们一直使用地址的上半部分来存储额外的数据,但至少在 x86-64 中,这是不可能的,因为地址的上半部分必须是“规范的”,这意味着任何地址位高于“可用限制”(由处理器架构定义,目前为 48 位),必须都与可用限制的最高位相同(因此,与第 47 位相同)。

    编辑:这部分代码完全符合我的描述:

        static const int dMask = 1;
        static const int ptrMask = ~dMask;
    
        Link() { } throw()
        Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()
        { 
            std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
        }
    
        Node* pointer() const throw() 
        { 
            return reinterpret_cast<Node*>(
                std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
        }
    

    它使用最低位来存储pDel 标志。

    您应该能够使用cmpxchg16b 的形式对双链表执行此操作(在x86 上)。在 Windows 系统中,这将是 _InterlockedCompareExchange128。在 gcc(对于 Unix 类型的操作系统,例如 Linux/MacOS)中,您需要首先从您的两个指针构造一个 int128。如果您正在编译 32 位代码,您可能需要为 Windows 和 Unix 操作系统创建一个 64 位 int。

    【讨论】:

    • 我认为使用最高位仍然可以,只要在尝试使用指针之前屏蔽布尔值即可。尽管从技术上讲,这将是未定义的行为。
    • 您关于按位或的讨论是正确的,但总的来说我想使用 CAS。
    • 本文假设指针在 32 位或 64 位架构上具有 4 字节或 8 字节对齐。这提供了 2 位或 3 位可供使用。该论文使用一个作为删除标记,一个用于锁定。
    • 参考论文的要点是,一次实际上只有一个指针被锁定。本文将结构视为带有prev 指针的单链表,这些指针可能不是最新的。该方案一次只需要一个 CAS 操作。它有助于实际阅读论文。
    • @MohammadRB 为什么不能使用std::atomic_compare_exchange_* 函数之一?他们直接实现CAS。 CAS的要点是它只在值相等时交换,如果值不相等则返回false。 SetMark伪代码的终止条件是删除标记被另一个线程设置或比较和交换成功,设置删除标记。
    【解决方案2】:

    http://www.drdobbs.com/cpp/lock-free-code-a-false-sense-of-security/210600279

    但是通过编写自己的无锁代码来批量替换锁是 不是答案。无锁代码有两个主要缺点。首先,它是 对解决典型问题没有广泛的用处——大量的基础数据 结构,甚至是双向链表,仍然没有已知的无锁 实现。提出新的或改进的无锁数据 结构仍然会为你赢得至少一篇在评审中发表的论文 期刊,有时还有学位。

    我认为使用它的效率不够,但无论如何它读起来很有趣。

    【讨论】:

    • 这个答案具有误导性,首先是存在不完美但相当不错的无锁双向链表实现,其次是它建议完全不要使用无锁代码。在锁的争用可能很高的情况下,无锁代码的性能明显优于使用锁的代码。
    【解决方案3】:

    在 x64 上,仅使用 44 位地址空间。如果您的指针对齐到 8 个字节,那么您只使用 41 位。对于 64 位,41x2 仍然太大。虽然我不能保证它的速度,但有一个 128 位的比较和交换。我总是尝试使用 64 位的。

    也许您最多只需要 20 亿个节点。因此,您可以做的是预先分配列表从中提取的节点池。当然,您可以通过使用原子操作获取下一个空闲池索引来创建节点。然后代替 next 和 prev 是指针,它们可以是节点池的 31 位索引,并且您有 2 位剩余用于删除标志。假设您不需要 20 亿个节点,那么您还剩下更多位。唯一的缺点是您必须知道在启动时需要多少个节点,尽管如果您也有的话,您可以重新分配节点。

    我所做的是使用虚拟内存函数来保留 GB 的地址空间,然后将物理内存映射到该空间,因为我需要它来扩展我的池,而无需重新分配。

    【讨论】:

      猜你喜欢
      • 2023-03-04
      • 2012-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-10-25
      • 1970-01-01
      • 2022-01-12
      相关资源
      最近更新 更多