【发布时间】:2021-11-13 14:47:21
【问题描述】:
我只是想测试一下我的编译器是否能识别...
atomic<pair<uintptr_t, uintptr_t>
...并在其上使用 DWCAS(如 x86-64 lock cmpxchg16b),或者如果它使用通常的锁来补充这对。
所以我首先编写了一个带有单个 noinline 函数的最小程序,该函数在原子对上进行比较和交换。编译器为此生成了很多我不理解的代码,并且我没有看到任何带有 LOCK 前缀的指令。我很好奇是否 实现在原子中放置了一个锁,并在 64 位平台上打印了上述原子对的 sizeof:24,因此显然没有锁。
最后,我编写了一个程序,该程序通过我系统具有的所有线程(Ryzen Threadripper 64 内核、Win10、SMT 关闭)将单个原子对的两个部分递增预定义的次数。然后我计算了每个增量的时间(以纳秒为单位)。时间相当长,每次成功增量大约需要 20.000ns,所以它首先查看 如果有一把锁我忽略了;所以对于这个 24 字节的原子的 sizeof 来说,这是不可能的。当我在 Processs Viewer 中看到时,我看到所有 64 个内核一直几乎处于 100%user CPU 时间 - 所以不可能有任何内核干预。
那么这里有没有人比我更聪明,并且可以从程序集转储中识别出这个 DWCAS 替代品做了什么?
这是我的测试代码:
#include <iostream>
#include <atomic>
#include <utility>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <vector>
using namespace std;
using namespace chrono;
struct uip_pair
{
uip_pair() = default;
uip_pair( uintptr_t first, uintptr_t second ) :
first( first ),
second( second )
{
}
uintptr_t first, second;
};
using atomic_pair = atomic<uip_pair>;
int main()
{
cout << "sizeof(atomic<pair<uintptr_t, uintptr_t>>): " << sizeof(atomic_pair) << endl;
atomic_pair ap( uip_pair( 0, 0 ) );
cout << "atomic<pair<uintptr_t, uintptr_t>>::is_lock_free: " << ap.is_lock_free() << endl;
mutex mtx;
unsigned nThreads = thread::hardware_concurrency();
unsigned ready = nThreads;
condition_variable cvReady;
bool run = false;
condition_variable cvRun;
atomic_int64_t sumDur = 0;
auto theThread = [&]( size_t n )
{
unique_lock<mutex> lock( mtx );
if( !--ready )
cvReady.notify_one();
cvRun.wait( lock, [&]() -> bool { return run; } );
lock.unlock();
auto start = high_resolution_clock::now();
uip_pair cmp = ap.load( memory_order_relaxed );
for( ; n--; )
while( !ap.compare_exchange_weak( cmp, uip_pair( cmp.first + 1, cmp.second + 1 ), memory_order_relaxed, memory_order_relaxed ) );
sumDur.fetch_add( duration_cast<nanoseconds>( high_resolution_clock::now() - start ).count(), memory_order_relaxed );
lock.lock();
};
vector<jthread> threads;
threads.reserve( nThreads );
static size_t const ROUNDS = 100'000;
for( unsigned t = nThreads; t--; )
threads.emplace_back( theThread, ROUNDS );
unique_lock<mutex> lock( mtx );
cvReady.wait( lock, [&]() -> bool { return !ready; } );
run = true;
cvRun.notify_all();
lock.unlock();
for( jthread &thr : threads )
thr.join();
cout << (double)sumDur / ((double)nThreads * ROUNDS) << endl;
uip_pair p = ap.load( memory_order_relaxed );
cout << "synch: " << (p.first == p.second ? "yes" : "no") << endl;
}
[编辑]:我已将 compare_exchange_weak-function 提取为 noinline-function 并反汇编代码:
struct uip_pair
{
uip_pair() = default;
uip_pair( uintptr_t first, uintptr_t second ) :
first( first ),
second( second )
{
}
uintptr_t first, second;
};
using atomic_pair = atomic<uip_pair>;
#if defined(_MSC_VER)
#define NOINLINE __declspec(noinline)
#elif defined(__GNUC__) || defined(__clang__)
#define NOINLINE __attribute((noinline))
#endif
NOINLINE
bool cmpXchg( atomic_pair &ap, uip_pair &cmp, uip_pair xchg )
{
return ap.compare_exchange_weak( cmp, xchg, memory_order_relaxed, memory_order_relaxed );
}
mov eax, 1
mov r10, rcx
mov r9d, eax
xchg DWORD PTR [rcx], eax
test eax, eax
je SHORT label8
label1:
mov eax, DWORD PTR [rcx]
test eax, eax
je SHORT label7
label2:
mov eax, r9d
test r9d, r9d
je SHORT label5
label4:
pause
sub eax, 1
jne SHORT label4
cmp r9d, 64
jl SHORT label5
lea r9d, QWORD PTR [rax+64]
jmp SHORT label6
label5:
add r9d, r9d
label6:
mov eax, DWORD PTR [rcx]
test eax, eax
jne SHORT label2
label7:
mov eax, 1
xchg DWORD PTR [rcx], eax
test eax, eax
jne SHORT label1
label8:
mov rax, QWORD PTR [rcx+8]
sub rax, QWORD PTR [rdx]
jne SHORT label9
mov rax, QWORD PTR [rcx+16]
sub rax, QWORD PTR [rdx+8]
label9:
test rax, rax
sete al
test al, al
je SHORT label10
movups xmm0, XMMWORD PTR [r8]
movups XMMWORD PTR [rcx+8], xmm0
xor ecx, ecx
xchg DWORD PTR [r10], ecx
ret
label10:
movups xmm0, XMMWORD PTR [rcx+8]
xor ecx, ecx
movups XMMWORD PTR [rdx], xmm0
xchg DWORD PTR [r10], ecx
ret
也许有人理解反汇编。请记住,XCHG 在 x86 上是隐式锁定的。在我看来,MSVC 在这里使用了某种软件事务内存。我可以任意扩展嵌入在 atomic 中的共享结构,但差异仍然是 8 个字节;所以 MSVC 总是使用某种 STM。
【问题讨论】:
-
你可能想看看std::atomic<T>::is_lock_free的结果
-
正如@RichardCritten 指出的那样,使用
is_lock_free函数来确定它是否在后台使用了锁。话虽如此,如果我是你,我会测试顺序一致的内存排序(默认为std::atomic)和std::mutex之间的性能差异。我希望,在低争用情况下,std::mutex的性能会比std::atomic上的顺序一致操作更好。 -
从this output 可以看出,MSVC 正在围绕它放置一个自旋锁互斥体。我不能很容易地实际测试它。
-
哦,好点。看起来这是一个 gcc libatomic 决定。 github.com/gcc-mirror/gcc/blob/… 说“用户可能期望‘无锁’也意味着‘快速’,这就是为什么我们不返回 true,例如,如果我们使用 CAS 实现具有这种大小/对齐的加载。”这正是 x86-64 上 16 字节的情况 - 如果您单步执行
__atomic_load_16,它也会执行lock cmpxchg16b。 -
快速浏览一下代码,我认为MSVC 正在将一个简单的自旋锁作为
atomic<pair<...>>的成员。这就是为什么它是 24 字节而不是 16 字节的原因,如果编译器要使用原子 DCAS 处理它,您会期望它。并且可以在反汇编中看到:rcx指向要处理的atomic<pair<...>>,DWORD PTR [rcx]是锁,数据本身在QWORD PTR [rcx+8]和QWORD PTR [rcx+16]。将此称为“软件事务性内存”似乎太花哨了。
标签: c++ visual-c++ x86-64 atomic stdatomic