【问题标题】:Read-write thread-safe smart pointer in C++, x86-64C++ 中的读写线程安全智能指针,x86-64
【发布时间】:2011-11-04 09:30:33
【问题描述】:

我开发了一些无锁数据结构,但出现了以下问题。

我有编写器线程,它在堆上创建对象并将它们包装在带有引用计数器的智能指针中。我也有很多读者线程,可以处理这些对象。代码可能如下所示:

SmartPtr ptr;

class Reader : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr local(ptr);
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           SmartPtr newPtr(new Object);    
           ptr = newPtr;  
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;) // wait for crash :(
}

当我创建 ptr 的线程本地副本时,这意味着至少

  1. 读取地址。
  2. 增加引用计数器。

我不能原子地执行这两个操作,因此有时我的读者会处理已删除的对象。

问题是——我应该使用什么样的智能指针来实现从多个线程进行读写访问并进行正确的内存管理?应该有解决方案,因为Java程序员根本不关心这样的问题,仅仅依靠所有对象都是引用并且只有在没有人使用它们时才被删除。

对于 PowerPC,我找到了 http://drdobbs.com/184401888,看起来不错,但使用了 x86 中没有的加载链接和存储条件指令。

据我了解,boost 指针仅使用锁提供此类功能。我需要无锁解决方案。

【问题讨论】:

  • std::shared_ptr? (或者,boost::shared_ptr 如果您的实现还不支持它。)
  • boost::shared_ptr有什么问题吗?
  • 我很确定 JVM 使用了锁。执行速度是您想要避免锁定的原因吗?检查之前的问题,例如 c:\Code\boost_1_47_0\boost;但我想知道您是否不必选择基于锁定的解决方案、依赖于特定指令或 C++11 语言支持的解决方案,或者不安全的解决方案,例如您正在使用的解决方案。
  • 为什么要无锁?它是您应用的瓶颈吗?
  • 我认为 std::shared_ptr 不支持原子分配。

标签: c++ thread-safety x86 smart-pointers lock-free


【解决方案1】:

boost::shared_ptr 有 atomic_store,它使用“无锁”自旋锁,对于 99% 的可能情况应该足够快。

    boost::shared_ptr<Object> ptr;
class Reader : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> local(boost::atomic_load(&ptr));
           // do smth   
       }
    }   
};

class Writer : public Thread {
    virtual void Run {
       for (;;) {
           boost::shared_ptr<Object> newPtr(new Object);    
           boost::atomic_store(&ptr, newPtr);
       }
    }
};

int main() {
    Pool* pool = SystemThreadPool();
    pool->Run(new Reader());
    pool->Run(new Writer());
    for (;;)
}

编辑:

响应下面的评论,实现在“boost/shared_ptr.hpp”...

template<class T> void atomic_store( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock_pool<2>::scoped_lock lock( p );
    p->swap( r );
}

template<class T> shared_ptr<T> atomic_exchange( shared_ptr<T> * p, shared_ptr<T> r )
{
    boost::detail::spinlock & sp = boost::detail::spinlock_pool<2>::spinlock_for( p );

    sp.lock();
    p->swap( r );
    sp.unlock();

    return r; // return std::move( r )
}

【讨论】:

  • 是的,向我们展示 atomic_swap 的实现,您的答案将是完整的。
  • 实现在 boost/shared_ptr.hpp 中。正如我在回答中提到的,这是一个简单的自旋锁。
  • +1 谢谢。我在另一个答案中建议了无锁实现,但自旋锁会更简单。也一样好,只要它在一定次数的旋转后不会回退到互斥锁。
  • 只要确保你在不同的内核上运行线程,否则自旋锁会适得其反,而互斥锁回退实际上会非常有益!
  • 我的建议是保持简单,除非您有很多线程竞争(+8 或更多),否则您不会看到此自旋锁和无锁解决方案之间的任何区别。请记住,CAS 指令不是免费的,因此如果争用较少,自旋锁实际上可能更快。这是我在这件事上的两分钱。祝你好运,RobH 解决方案将是无锁的。
【解决方案2】:

通过一些 jiggery-pokery,您应该能够使用 InterlockedCompareExchange128 完成此操作。将引用计数和指针存储在 2 元素 __int64 数组中。如果引用计数在数组[0] 中并且指针在数组[1] 中,则原子更新将如下所示:

while(true)
{
    __int64 comparand[2];
    comparand[0] = refCount;
    comparand[1] = pointer;
    if(1 == InterlockedCompareExchange128(
        array,
        pointer,
        refCount + 1,
        comparand))
    {
        // Pointer is ready for use. Exit the while loop.
    }
}

如果 InterlockedCompareExchange128 内部函数不适用于您的编译器,那么您可以改用底层 CMPXCHG16B 指令,前提是您不介意在汇编语言中乱七八糟。

【讨论】:

  • InterlockedCompareExchange64 不应该满足 x86 的要求吗?
  • 我看到 CMPXCHG16B 指令在 x64-64 指令集中可用,因此无论使用什么编译器内部函数都可以。或者 OP 可以深入到 __asm 部分:-o
  • @zennehoy 是的,它会的。不过 OP 是 64 位的,所以 InterlockedCompareExchange128 是原子处理指针和引用计数所必需的。
  • 呃,-1?有人对这个答案有疑问吗?
  • x86-64 == x64,所以 InterlockedCompareExchange128 可用。我只是说该解决方案适用于 x86 和 x64,分别使用 I-C-E-64 或 -128。不知道为什么这得到了反对票,请投赞成票。
【解决方案3】:

RobH 提出的解决方案不起作用。和原问题有同样的问题:访问引用计数对象时,可能已经被删除了。

我认为在没有全局锁(如 boost::atomic_store 中)或条件读/写指令的情况下解决问题的唯一方法是以某种方式延迟对象(或共享引用计数对象,如果这样的话)的销毁用过的)。所以 zennehoy 有一个好主意,但他的方法太不安全了。

我可能这样做的方法是在编写器线程中保留所有指针的副本,以便编写器可以控制对象的销毁:

class Writer : public Thread {
    virtual void Run() {
        list<SmartPtr> ptrs; //list that holds all the old ptr values        

        for (;;) {
            SmartPtr newPtr(new Object);
            if(ptr)
                ptrs.push_back(ptr); //push previous pointer into the list
            ptr = newPtr;

            //Periodically go through the list and destroy objects that are not
            //referenced by other threads
            for(auto it=ptrs.begin(); it!=ptrs.end(); )
                if(it->refCount()==1)
                    it = ptrs.erase(it);
                else
                    ++it;
       }
    }
};

但是,对于智能指针类仍有一些要求。这不适用于 shared_ptr,因为读取和写入不是原子的。它几乎适用于 boost::intrusive_ptr。 intrusive_ptr 上的赋值是这样实现的(伪代码):

//create temporary from rhs
tmp.ptr = rhs.ptr;
if(tmp.ptr)
    intrusive_ptr_add_ref(tmp.ptr);

//swap(tmp,lhs)
T* x = lhs.ptr;
lhs.ptr = tmp.ptr;
tmp.ptr = x;

//destroy temporary
if(tmp.ptr)
    intrusive_ptr_release(tmp.ptr);

据我所知,这里唯一缺少的是lhs.ptr = tmp.ptr; 之前的编译器级内存栅栏。加上这一点,在严格的条件下读取rhs 和写入lhs 都是线程安全的:1) x86 或 x64 架构 2) 原子引用计数 3) rhs 在赋值期间引用计数不得为零(保证通过上面的 Writer 代码)4)只有一个线程写入 lhs(使用 CAS 你可以有多个 writer)。

无论如何,您可以根据 intrusive_ptr 创建自己的智能指针类,并进行必要的更改。绝对比重新实现 shared_ptr 容易。此外,如果您想要性能,侵入式是您的最佳选择。

【讨论】:

  • 谢谢,你的回答跟我想的差不多。
【解决方案4】:

这在 java 中更容易工作的原因是垃圾收集。在 C++ 中,当您想要删除某个值时,您必须手动确保该值不只是开始被其他线程使用。

我在类似情况下使用的解决方案是简单地延迟删除值。我创建了一个单独的线程,它遍历要删除的内容列表。当我想删除某些东西时,我会将它添加到这个带有时间戳的列表中。删除线程在实际删除该值之前等到此时间戳之后的某个固定时间。您只需确保延迟足够大,以保证该值的任何临时使用都已完成。

在我的情况下,100 毫秒就足够了,为了安全起见,我选择了几秒钟。

【讨论】:

  • 在我看来,这是一个相当复杂且有风险的解决方案。
  • 对于删除无锁链表中的节点,这是我能想到的唯一解决方案。问题是当一个节点被标记为删除时,另一个线程可能已经开始使用它(例如通过迭代它)。如果您知道任何替代方案,我很想听听!
  • mine 和 RobHs 都是更好的选择。
猜你喜欢
  • 2010-10-22
  • 1970-01-01
  • 2011-08-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-18
相关资源
最近更新 更多