【问题标题】:DWCAS-alternative with no help of the kernel无需内核帮助的 DWCAS 替代方案
【发布时间】:2021-11-13 14:47:21
【问题描述】:

我只是想测试一下我的编译器是否能识别...

atomic<pair<uintptr_t, uintptr_t>

...并在其上使用 DWCAS(如 x86-64 lock cmpxchg16b),或者如果它使用通常的锁来补充这对。

所以我首先编写了一个带有单个 noinline 函数的最小程序,该函数在原子对上进行比较和交换。编译器为此生成了很多我不理解的代码,并且我没有看到任何带有 LOCK 前缀的指令。我很好奇是否 实现在原子中放置了一个锁,并在 64 位平台上打印了上述原子对的 sizeof:24,因此显然没有锁。

最后,我编写了一个程序,该程序通过我系统具有的所有线程(Ryzen Threadripper 64 内核、Win10、SMT 关闭)将单个原子对的两个部分递增预定义的次数。然后我计算了每个增量的时间(以纳秒为单位)。时间相当长,每次成功增量大约需要 20.000ns,所以它首先查看 如果有一把锁我忽略了;所以对于这个 24 字节的原子的 sizeof 来说,这是不可能的。当我在 Processs Viewer 中看到时,我看到所有 64 个内核一直几乎处于 100%user CPU 时间 - 所以不可能有任何内核干预。

那么这里有没有人比我更聪明,并且可以从程序集转储中识别出这个 DWCAS 替代品做了什么?

这是我的测试代码:

#include <iostream>
#include <atomic>
#include <utility>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <vector>

using namespace std;
using namespace chrono;

struct uip_pair
{
    uip_pair() = default;
    uip_pair( uintptr_t first, uintptr_t second ) :
        first( first ),
        second( second )
    {
    }
    uintptr_t first, second;
};

using atomic_pair = atomic<uip_pair>;

int main()
{
    cout << "sizeof(atomic<pair<uintptr_t, uintptr_t>>): " << sizeof(atomic_pair) << endl;
    atomic_pair ap( uip_pair( 0, 0 ) );
    cout << "atomic<pair<uintptr_t, uintptr_t>>::is_lock_free: " << ap.is_lock_free() << endl;
    mutex mtx;
    unsigned nThreads = thread::hardware_concurrency();
    unsigned ready = nThreads;
    condition_variable cvReady;
    bool run = false;
    condition_variable cvRun;
    atomic_int64_t sumDur = 0;
    auto theThread = [&]( size_t n )
    {
        unique_lock<mutex> lock( mtx );
        if( !--ready )
            cvReady.notify_one();
        cvRun.wait( lock, [&]() -> bool { return run; } );
        lock.unlock();
        auto start = high_resolution_clock::now();
        uip_pair cmp = ap.load( memory_order_relaxed );
        for( ; n--; )
            while( !ap.compare_exchange_weak( cmp, uip_pair( cmp.first + 1, cmp.second + 1 ), memory_order_relaxed, memory_order_relaxed ) );
        sumDur.fetch_add( duration_cast<nanoseconds>( high_resolution_clock::now() - start ).count(), memory_order_relaxed );
        lock.lock();
    };
    vector<jthread> threads;
    threads.reserve( nThreads );
    static size_t const ROUNDS = 100'000;
    for( unsigned t = nThreads; t--; )
        threads.emplace_back( theThread, ROUNDS );
    unique_lock<mutex> lock( mtx );
    cvReady.wait( lock, [&]() -> bool { return !ready; } );
    run = true;
    cvRun.notify_all();
    lock.unlock();
    for( jthread &thr : threads )
        thr.join();
    cout << (double)sumDur / ((double)nThreads * ROUNDS) << endl;
    uip_pair p = ap.load( memory_order_relaxed );
    cout << "synch: " << (p.first == p.second ? "yes" : "no") << endl;
}

[编辑]:我已将 compare_exchange_weak-function 提取为 noinline-function 并反汇编代码:

struct uip_pair
{
    uip_pair() = default;
    uip_pair( uintptr_t first, uintptr_t second ) :
        first( first ),
        second( second )
    {
    }
    uintptr_t first, second;
};

using atomic_pair = atomic<uip_pair>;

#if defined(_MSC_VER)
    #define NOINLINE __declspec(noinline)
#elif defined(__GNUC__) || defined(__clang__)
    #define NOINLINE __attribute((noinline))
#endif

NOINLINE
bool cmpXchg( atomic_pair &ap, uip_pair &cmp, uip_pair xchg )
{
    return ap.compare_exchange_weak( cmp, xchg, memory_order_relaxed, memory_order_relaxed );
}

    mov    eax, 1
    mov    r10, rcx
    mov    r9d, eax
    xchg   DWORD PTR [rcx], eax
    test   eax, eax
    je     SHORT label8
label1:
    mov    eax, DWORD PTR [rcx]
    test   eax, eax
    je     SHORT label7
label2:
    mov    eax, r9d
    test   r9d, r9d
    je     SHORT label5
label4:
    pause
    sub    eax, 1
    jne    SHORT label4
    cmp    r9d, 64
    jl     SHORT label5
    lea    r9d, QWORD PTR [rax+64]
    jmp    SHORT label6
label5:
    add    r9d, r9d
label6:
    mov    eax, DWORD PTR [rcx]
    test   eax, eax
    jne    SHORT label2
label7:
    mov    eax, 1
    xchg   DWORD PTR [rcx], eax
    test   eax, eax
    jne    SHORT label1
label8:
    mov    rax, QWORD PTR [rcx+8]
    sub    rax, QWORD PTR [rdx]
    jne    SHORT label9
    mov    rax, QWORD PTR [rcx+16]
    sub    rax, QWORD PTR [rdx+8]
label9:
    test   rax, rax
    sete   al
    test   al, al
    je     SHORT label10
    movups xmm0, XMMWORD PTR [r8]
    movups XMMWORD PTR [rcx+8], xmm0
    xor    ecx, ecx
    xchg   DWORD PTR [r10], ecx
    ret
label10:
    movups xmm0, XMMWORD PTR [rcx+8]
    xor    ecx, ecx
    movups XMMWORD PTR [rdx], xmm0
    xchg   DWORD PTR [r10], ecx
    ret

也许有人理解反汇编。请记住,XCHG 在 x86 上是隐式锁定的。在我看来,MSVC 在这里使用了某种软件事务内存。我可以任意扩展嵌入在 atomic 中的共享结构,但差异仍然是 8 个字节;所以 MSVC 总是使用某种 STM。

【问题讨论】:

  • 你可能想看看std::atomic<T>::is_lock_free的结果
  • 正如@RichardCritten 指出的那样,使用is_lock_free 函数来确定它是否在后台使用了锁。话虽如此,如果我是你,我会测试顺序一致的内存排序(默认为std::atomic)和std::mutex 之间的性能差异。我希望,在低争用情况下,std::mutex 的性能会比std::atomic 上的顺序一致操作更好。
  • this output 可以看出,MSVC 正在围绕它放置一个自旋锁互斥体。我不能很容易地实际测试它。
  • 哦,好点。看起来这是一个 gcc libatomic 决定。 github.com/gcc-mirror/gcc/blob/… 说“用户可能期望‘无锁’也意味着‘快速’,这就是为什么我们不返回 true,例如,如果我们使用 CAS 实现具有这种大小/对齐的加载。”这正是 x86-64 上 16 字节的情况 - 如果您单步执行 __atomic_load_16,它也会执行 lock cmpxchg16b
  • 快速浏览一下代码,我认为MSVC 正在将一个简单的自旋锁作为atomic&lt;pair&lt;...&gt;&gt; 的成员。这就是为什么它是 24 字节而不是 16 字节的原因,如果编译器要使用原子 DCAS 处理它,您会期望它。并且可以在反汇编中看到:rcx指向要处理的atomic&lt;pair&lt;...&gt;&gt;DWORD PTR [rcx]是锁,数据本身在QWORD PTR [rcx+8]QWORD PTR [rcx+16]。将此称为“软件事务性内存”似乎太花哨了。

标签: c++ visual-c++ x86-64 atomic stdatomic


【解决方案1】:

正如 Nate 在 cmets 中指出的,它是一个自旋锁。

您可以查找源代码,它与编译器一起提供。并且可以在 Github 上找到。

如果您构建未优化的调试,您可以在交互式调试期间单步执行此源!

a member variable called _Spinlock

这是locking function

#if 1 // TRANSITION, ABI, GH-1151
inline void _Atomic_lock_acquire(long& _Spinlock) noexcept {
#if defined(_M_IX86) || (defined(_M_X64) && !defined(_M_ARM64EC))
    // Algorithm from Intel(R) 64 and IA-32 Architectures Optimization Reference Manual, May 2020
    // Example 2-4. Contended Locks with Increasing Back-off Example - Improved Version, page 2-22
    // The code in mentioned manual is covered by the 0BSD license.
    int _Current_backoff   = 1;
    const int _Max_backoff = 64;
    while (_InterlockedExchange(&_Spinlock, 1) != 0) {
        while (__iso_volatile_load32(&reinterpret_cast<int&>(_Spinlock)) != 0) {
            for (int _Count_down = _Current_backoff; _Count_down != 0; --_Count_down) {
                _mm_pause();
            }
            _Current_backoff = _Current_backoff < _Max_backoff ? _Current_backoff << 1 : _Max_backoff;
        }
    }
#elif defined(_M_ARM) || defined(_M_ARM64) || defined(_M_ARM64EC)
    while (_InterlockedExchange(&_Spinlock, 1) != 0) { // TRANSITION, GH-1133: _InterlockedExchange_acq
        while (__iso_volatile_load32(&reinterpret_cast<int&>(_Spinlock)) != 0) {
            __yield();
        }
    }
#else // ^^^ defined(_M_ARM) || defined(_M_ARM64) || defined(_M_ARM64EC) ^^^
#error Unsupported hardware
#endif
}

(披露:我从英特尔手册中引入了越来越多的回退,之前只是一个 xchg 循环,issuePR

众所周知,自旋锁的使用不是最理想的,而是应该使用执行内核等待的互斥锁。自旋锁的问题在于,在极少数情况下,当低优先级线程持有自旋锁时发生上下文切换时,需要一段时间才能解锁该自旋锁,因为调度程序不会意识到高优先级线程正在等待该自旋锁。

当然不使用cmpxchg16b 也是次优的。尽管如此,对于更大的原子,必须使用非无锁机制。 (没有决定避免 cmpxchg16b,这只是 ABI 兼容 Visual Studio 2015 的结果)

有一个关于让它变得更好的问题,希望在下一次 ABI 中断时得到解决:https://github.com/microsoft/STL/issues/1151

至于事务内存。在那里使用硬件事务内存可能是有意义的。我可以推测英特尔 RTM 可能会在那里用内在函数实现,或者可能会有一些未来的操作系统 API 用于它们(例如,增强的SRWLOCK),但很可能没有人会想要更多的复杂性在那里,因为非无锁原子是一种兼容性工具,而不是你故意想要使用的东西。

【讨论】:

  • RTM 可用作lock cmpxchg16b 的替代品,可能使纯负载更高效。 (特别是对于多个并发读取器,因为它真的是只读的,不需要让行进入 Exclusive 或 Modified 状态。)对于纯存储可能不会更快,除了避免竞争存储上的 cx16 重试循环。当然,如果你有一个支持任何其他方法的 128 位整数类型,也可以替换 cx16 重试循环。
  • RTM 将与使用 lock cmpxchg16 的其他线程兼容,因此它不会是 ABI 中断;这就是美丽或事务性内存:它只是用多条指令进行原子事务,而不是像lock xadd 这样的限制。 (当然,涉及多个内存位置的事务是一种全新的能力,就像实际的 DCAS 一样,而不仅仅是用于一对连续指针的 DWCAS。)
  • @PeterCordes,启用 RTM 后无法立即访问 CPU,这使我无法尝试在任何地方引入 RTM,否则我可能已经尝试过了
  • 我自己并没有真正玩过它,我只是了解它如何在 CPU 架构级别上工作的概念。 (或者至少我认为我会:P 基于realworldtech.com/haswell-tm 之类的东西,并了解lock cmpxchg16b 在内部如何工作)。使用它的实际问题包括在拥有它的 CPU 上有效地使用正确的代码。并且由于英特尔就像露西对我们的查理布朗踢足球一样,它在越来越多的曾经支持它的 CPU 上不断被禁用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-08-13
  • 2010-10-27
  • 1970-01-01
  • 2021-10-17
  • 1970-01-01
相关资源
最近更新 更多