【问题标题】:Fastest inline-assembly spinlock最快的内联组装自旋锁
【发布时间】:2012-08-11 03:53:35
【问题描述】:

我正在用 c++ 编写一个多线程应用程序,其中性能至关重要。在线程之间复制小型结构时,我需要使用大量锁定,为此我选择使用自旋锁。

我对此进行了一些研究和速度测试,我发现大多数实现的速度大致相同:

  • Microsoft 的 CRITICAL_SECTION,将 SpinCount 设置为 1000,得分约为 140 个时间单位
  • 使用 Microsoft 的 InterlockedCompareExchange 实现 this algorithm 得分约为 95 个时间单位
  • 我还尝试使用 __asm {} 之类的 this code 之类的内联程序集,它的得分约为 70 个时间单位,但我不确定是否已创建适当的内存屏障。

编辑:这里给出的时间是 2 个线程锁定和解锁自旋锁 1,000,000 次所需的时间。

我知道这并没有太大区别,但由于自旋锁是一个使用频率很高的对象,人们会认为程序员会同意最快的方法来制作自旋锁。然而,谷歌搜索它会导致许多不同的方法。如果使用内联汇编并使用指令CMPXCHG8B 而不是比较 32 位寄存器来实现,我认为this aforementioned method 将是最快的。 还必须考虑内存屏障,这可以通过 LOCK CMPXHG8B(我认为?) 来完成,它保证了内核之间共享内存的“独占权”。最后[有人建议] 忙等待应该伴随着 NOP:REP 这将使超线程处理器能够切换到另一个线程,但我不确定这是否正确?

从我对不同自旋锁的性能测试来看,差别不大,但出于纯粹的学术目的,我想知道哪一个最快。但是,由于我在汇编语言和内存屏障方面的经验非常有限,如果有人可以为我提供的最后一个示例编写汇编代码,我会很高兴,其中包含 LOCK CMPXCHG8B 和 适当的内存屏障以下模板:

__asm
{
     spin_lock:
         ;locking code.
     spin_unlock:
         ;unlocking code.
}

【问题讨论】:

  • +1 用于在询问之前提供良好的资源和信息。我认为你给的比你需要的多。谢谢
  • 多少到底是多少?担心你可以旋转多快需要非常多。您确定没有更好的方法可以在此处限制访问?还要记住,您旋转的速度不会影响您实际获得锁的时间。不管你旋转多快,其他人必须先解锁它。如果事实证明您将长时间旋转,请考虑循环 yield() 以将执行传递给另一个正在运行的线程或进程。
  • rep nop aka pause 也使 P4 在您离开旋转循环时不会做迟钝的事情。英特尔的手册明确建议在自旋等待循环中使用它。你可以使用XACQUIREXRELEASE(在 Haswell 之前不可用)吗?
  • @Wug 性能测试中给出的时间,是 2 个线程同时锁定、复制 4 个整数(以增加真实性)和解锁自旋锁的时间可能 10000000 次(我不是来源这台电脑上的代码)。给出的时间单位没有提供任何关于已经运行了多少循环的信息。
  • 当你想要性能时,在你的快速路径上使用无锁/无争用数据结构,并且只在你的慢速路径上加锁

标签: c++ assembly x86 memory-barriers spinlock


【解决方案1】:

尽管已经有一个公认的答案,但仍有一些遗漏的东西可以用来改进所有答案,取自this Intel article, all above fast lock implementation

  1. 旋转易失性读取,而不是原子指令,这样可以避免不必要的总线锁定,尤其是在高度竞争的锁定上。
  2. 对竞争激烈的锁使用回退
  3. 内联锁,最好使用内联 asm 有害的编译器(基本上是 MSVC)的内在函数。

【讨论】:

    【解决方案2】:

    我通常不会抱怨有人努力实现快速代码:这通常是一个很好的练习,可以更好地理解编程和更快的代码。

    我也不会在这里抱怨,但我可以明确指出,快速自旋锁 3 条指令或更多指令的问题是 - 至少在 x86 架构上 - 是徒劳的。

    原因如下:

    使用典型的代码序列调用自旋锁

    lock_variable DW 0    ; 0 <=> free
    
    mov ebx,offset lock_variable
    mov eax,1
    xchg eax,[ebx]
    
    ; if eax contains 0 (no one owned it) you own the lock,
    ; if eax contains 1 (someone already does) you don't
    

    释放自旋锁很简单

    mov ebx,offset lock_variable
    mov dword ptr [ebx],0
    

    xchg 指令提升处理器上的锁定引脚,这实际上意味着我希望在接下来的几个时钟周期内使用总线。该信号通过高速缓存传递到最慢的总线主控设备,通常是 PCI 总线。当每个总线主控设备完成后,locka(锁定确认)信号就会被发回。然后进行实际的交换。问题是 lock/locka 序列需要很长时间。 PCI 总线可能以 33MHz 运行,有几个延迟周期。在 3.3 GHz CPU 上,这意味着每个 PCI 总线周期需要一百个 CPU 周期。

    根据经验,我假设一个锁需要 300 到 3000 个 CPU 周期才能完成,最后我什至不知道我是否会拥有这个锁。因此,您可以通过“快速”自旋锁节省的几个周期将是海市蜃楼,因为没有像下一个锁那样的锁,这取决于您在短时间内的总线情况。

    ________________编辑________________

    我刚刚读到自旋锁是一个“大量使用的对象”。好吧,您显然不明白自旋锁在每次调用时都会消耗大量的 CPU 周期。或者,换一种说法,每次调用它都会失去大量的处理能力。

    使用自旋锁(或它们更大的兄弟,临界区)的技巧是尽可能少地使用它们,同时仍能实现预期的程序功能。在所有地方使用它们很容易,结果你的性能会很差。

    不仅仅是编写快速代码,还包括组织数据。当您编写“在线程之间复制小型结构”时,您应该意识到完成锁定可能需要比实际复制长数百倍的时间。

    ________________编辑________________

    当您计算平均锁定时间时,它可能会说得很少,因为它是在您的机器上测量的,这可能不是预期的目标(可能具有完全不同的总线使用特性)。对于您的机器,平均值将由各个非常快的时间(当总线主控活动没有干扰时)一直到非常慢的时间(当总线主控干扰很大时)组成。

    您可以引入确定最快和最慢情况的代码并计算商以查看自旋锁时间的变化幅度。

    ________________编辑________________

    2016 年 5 月更新。

    Peter Cordes 提出了“在非竞争情况下调整锁是有意义的”的想法,并且现代 CPU 上不会出现数百个时钟周期的锁定时间,除非在锁定变量未对齐的情况下。我开始怀疑我的previous test program(用 32 位 Watcom C 编写)是否会受到 WOW64 的阻碍,因为它在 64 位操作系统:Windows 7 上运行。

    于是我写了一个64位的程序,用TDM的gcc 5.3编译。该程序使用隐式总线锁定指令变体“XCHG r,m”进行锁定,使用简单的赋值“MOV m,r”进行解锁。在某些锁变体中,锁变量经过预先测试以确定尝试锁是否可行(使用简单的比较“CMP r,m”,可能不会冒险到 L3 之外)。这里是:

    // compiler flags used:
    
    // -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0
    
    #define CLASSIC_BUS_LOCK
    #define WHILE_PRETEST
    //#define SINGLE_THREAD
    
    typedef unsigned char      u1;
    typedef unsigned short     u2;
    typedef unsigned long      u4;
    typedef unsigned int       ud;
    typedef unsigned long long u8;
    typedef   signed char      i1;
    typedef          short     i2;
    typedef          long      i4;
    typedef          int       id;
    typedef          long long i8;
    typedef          float     f4;
    typedef          double    f8;
    
    #define usizeof(a) ((ud)sizeof(a))
    
    #define LOOPS 25000000
    
    #include <stdio.h>
    #include <windows.h>
    
    #ifndef bool
    typedef signed char bool;
    #endif
    
    u8 CPU_rdtsc (void)
    {
      ud tickl, tickh;
      __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
      return ((u8)tickh << 32)|tickl;
    }
    
    volatile u8 bus_lock (volatile u8 * block, u8 value)
    {
      __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");
    
      return value;
    }
    
    void bus_unlock (volatile u8 * block, u8 value)
    {
      __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
    }
    
    void rfence (void)
    {
      __asm__ __volatile__( "lfence" : : : "memory");
    }
    
    void rwfence (void)
    {
      __asm__ __volatile__( "mfence" : : : "memory");
    }
    
    void wfence (void)
    {
      __asm__ __volatile__( "sfence" : : : "memory");
    }
    
    volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
    {
      return (bool)(*lockVariablePointer == 0ull);
    }
    
    volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
    {
      return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
    }
    
    void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
    {
      *lockVariablePointer = 0ull;
    }
    
    static volatile u8 lockVariable = 0ull,
                       lockCounter =  0ull;
    
    static volatile i8 threadHold = 1;
    
    static u8 tstr[4][32];    /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */
    
    struct LOCKING_THREAD_STRUCTURE
    {
      u8 numberOfFailures, numberOfPreTests;
      f8 clocksPerLock, failuresPerLock, preTestsPerLock;
      u8 threadId;
      HANDLE threadHandle;
      ud idx;
    } *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};
    
    DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
    {
      ud n = LOOPS;
      u8 clockCycles;
    
      SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);
    
      while (threadHold) {}
    
      clockCycles = CPU_rdtsc ();
      while (n)
      {
        Sleep (0);
    
    #ifdef CLASSIC_BUS_LOCK
        while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
    #else
    #ifdef WHILE_PRETEST
        while (1)
        {
          do
          {
            ++ltsp->numberOfPreTests;
          } while (!LOCK_spinlockPreTestIfFree (&lockVariable));
    
          if (!LOCK_spinlockFailed (&lockVariable)) break;
          ++ltsp->numberOfFailures;
        }
    #else
        while (1)
        {
          ++ltsp->numberOfPreTests;
          if (LOCK_spinlockPreTestIfFree (&lockVariable))
          {
            if (!LOCK_spinlockFailed (&lockVariable)) break;
            ++ltsp->numberOfFailures;
          }
        }
    #endif
    #endif
        ++lockCounter;
        LOCK_spinlockLeave (&lockVariable);
    
    #ifdef CLASSIC_BUS_LOCK
        while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
    #else
    #ifdef WHILE_PRETEST
        while (1)
        {
          do
          {
            ++ltsp->numberOfPreTests;
          } while (!LOCK_spinlockPreTestIfFree (&lockVariable));
    
          if (!LOCK_spinlockFailed (&lockVariable)) break;
          ++ltsp->numberOfFailures;
        }
    #else
        while (1)
        {
          ++ltsp->numberOfPreTests;
          if (LOCK_spinlockPreTestIfFree (&lockVariable))
          {
            if (!LOCK_spinlockFailed (&lockVariable)) break;
            ++ltsp->numberOfFailures;
          }
        }
    #endif
    #endif
        --lockCounter;
        LOCK_spinlockLeave (&lockVariable);
    
        n-=2;
      }
      clockCycles = CPU_rdtsc ()-clockCycles;
    
      ltsp->clocksPerLock =   (f8)clockCycles/           (f8)LOOPS;
      ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
      ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;
    
    //rwfence ();
    
      ltsp->idx = 4u;
    
      ExitThread (0);
      return 0;
    }
    
    int main (int argc, char *argv[])
    {
      u8 processAffinityMask, systemAffinityMask;
    
      memset (tstr, 0u, usizeof(tstr));
    
      lts[0]->idx = 3;
      lts[1]->idx = 2;
      lts[2]->idx = 1;
      lts[3]->idx = 0;
    
      GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);
    
      SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
      SetThreadAffinityMask (GetCurrentThread (), 1ull);
    
      lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId);
    #ifndef SINGLE_THREAD
      lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId);
      lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId);
      lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId);
    #endif
    
      SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);
    
      threadHold = 0;
    
    #ifdef SINGLE_THREAD
      while (lts[0]->idx<4u) {Sleep (1);}
    #else
      while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
    #endif
    
      printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
      printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
      printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
      printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);
    
      printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+  lts[1]->clocksPerLock+  lts[2]->clocksPerLock+  lts[3]->clocksPerLock)/  4.,
                                        (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
                                        (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);
    
      printf ("LC:%u\n", (ud)lockCounter);
    
      return 0;
    }
    

    该程序在基于 DELL i5-4310U 的计算机上运行,​​该计算机具有 DDR3-800、2 个内核/2 个 2.7GHz HT 和一个通用 L3 缓存。

    首先,WOW64 的影响似乎可以忽略不计。

    执行非竞争锁定/解锁的单个线程能够每 110 个周期执行一次。调整非竞争锁是没有用的:任何为增强单个 XCHG 指令而添加的代码只会使其变慢。

    随着四个 HT 用锁尝试轰炸锁变量,情况发生了根本性的变化。实现成功锁定所需的时间跃升至 994 个周期,其中很大一部分可归因于 2.2 次失败的锁定尝试。换句话说,在高争用情况下,平均必须尝试 3.2 次锁才能获得成功的锁。显然110个周期并没有变成110*3.2而是更接近110*9。因此,其他机制在这里发挥作用,就像在旧机器上的测试一样。此外,平均 994 个周期包含 716 和 1157 之间的范围

    实现预测试的锁变体大约需要最简单变体 (XCHG) 所需周期的 95%。平均而言,他们会执行 17 次 CMP,以发现尝试 1.75 次锁定是可行的,其中 1 次成功。我建议使用预测试,不仅因为它更快:它对总线锁定机制施加的压力更小(锁定尝试次数减少 3.2-1.75=1.45),尽管它略微增加了复杂性。

    【讨论】:

    • xchg 带有内存操作数和其他 locked 指令,在现代 x86 CPU 上并没有那么慢。它们与缓存一致,但与 DMA 不一致,因此它们不必等待 DRAM 访问。 (绝对不是 PCI 总线周期,除非您在内存映射 IO 地址上使用它!)。例如根据 Agner Fog 的表格,在 Sandybridge(2012 年的最新版本)上,xchg 的内存延迟为 25 个周期(在缓存命中时)。如果当前内核有处于 Exclusive 状态的缓存行,那么原子 RMW 很容易,而需要时间的是内存屏障部分。
    • xchg 的最坏情况是,如果另一个内核的缓存行处于修改或独占状态,而 locked 指令正在运行,或者可能没有内核缓存该行,所以它有从主存储器中读取。请注意,它不必必须回主存;您最终会在当前内核的 L1 中获得处于 M 状态的缓存行。
    • @Peter Cordes:那么,为什么 AF 会在指令表第 1 页的底部写下 - “即使在单个处理器系统上,LOCK 前缀通常也会花费一百多个时钟周期。这也适用于带有内存操作数的 XCHG 指令。”?我猜 Agner Fog 可以被认为是对与错……它是什么?
    • 最有效的锁是你设法避免的——如果你足够熟练的话。新手和不了解它们的人会认为它们没有显着的成本,并且比其他人更频繁地使用它们,就像大多数编写多线程的人最终会得到一个较慢的应用程序一样。最后,您的代码可能必须在总线上发送 LOCK 信号并等待最慢的总线主控设备返回 LOCKA 以说明锁已到位。或者您是否声称这种情况不会发生?如果您的锁变量已从缓存中逐出怎么办?
    • OP 仍然写道:“在线程之间复制小型结构时,我需要使用大量锁定,为此我选择使用自旋锁”。尽管我的代码显示四个内核中的每一个内核都可以实现每秒 270 万次锁定/解锁,但 OP 使用如此多的内核并非不可想象,以至于他的多线程代码在没有锁定的情况下使用单线程会更好。
    【解决方案3】:

    维基百科有一篇关于自旋锁的好文章,这里是 x86 实现

    http://en.wikipedia.org/wiki/Spinlock#Example_implementation

    请注意,它们的实现不使用“lock”前缀,因为它在 x86 上对于“xchg”指令是多余的 - 它隐含地具有锁定语义,正如 Stackoverflow 讨论中所讨论的:

    On a multicore x86, is a LOCK necessary as a prefix to XCHG?

    REP:NOP 是 PAUSE 指令的别名,您可以在此处了解更多信息

    How does x86 pause instruction work in spinlock *and* can it be used in other scenarios?

    关于内存屏障的问题,这里有你可能想知道的一切

    内存屏障:软件黑客的硬件视图,Paul E. McKenney

    http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf

    【讨论】:

    • 在您链接到维基百科实现的代码中,为什么将内存中的锁定值设置为0 的指令必须是原子的(使用xchg 指令)?似乎即使另一个线程在设置为 0 之前访问了内存位置,它所看到的只是锁仍然被锁定 - 不会直接 ​​mov 到内存位置工作吗?
    • 是的,mov 适用于新的 x86 处理器,但不适用于某些较旧的处理器,这将在下一节题为“重要优化”On later implementations of the x86 architecture, spin_unlock can safely use an unlocked MOV instead of the slower locked XCHG. This is due to subtle memory ordering rules which support this, even though MOV is not a full memory barrier. However, some processors (some Cyrix processors, some revisions of the Intel Pentium Pro (due to bugs), and earlier Pentium and i486 SMP systems) will do the wrong thing and data protected by the lock could be corrupted. 中进行解释
    • 感谢您指出这一点。如果你不介意我问,这些“微妙的内存排序规则”避免了什么问题?
    • X86 内存排序要求一个线程的存储以相同的顺序被其他线程看到,这意味着另一个线程无法获得锁而看到存储在释放锁之前在临界区完成。见本文第(4)节spinroot.com/spin/Doc/course/x86_tso.pdf
    • 顺便说一句,我发现 MOV 就足够了(不需要序列化或内存屏障)这一事实在 1999 年英特尔架构师lkml.org/lkml/1999/11/24/90 对 Linus Torvalds 的回复中“正式”得到了澄清我猜后来发现它不适用于某些较旧的 x86 处理器。
    【解决方案4】:

    看看这里: x86 spinlock using cmpxchg

    感谢科里·纳尔逊

    __asm{
    spin_lock:
    xorl %ecx, %ecx
    incl %ecx
    spin_lock_retry:
    xorl %eax, %eax
    lock; cmpxchgl %ecx, (lock_addr)
    jnz spin_lock_retry
    ret
    
    spin_unlock:
    movl $0 (lock_addr)
    ret
    }
    

    另一个消息来源说: http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm

           lock    cmpxchg8b qword ptr [esi]
    is replaceable with the following sequence
    
    try:
            lock    bts dword ptr [edi],0
            jnb     acquired
    wait:
            test    dword ptr [edi],1
            je      try
            pause                   ; if available
            jmp     wait
    
    acquired:
            cmp     eax,[esi]
            jne     fail
            cmp     edx,[esi+4]
            je      exchange
    
    fail:
            mov     eax,[esi]
            mov     edx,[esi+4]
            jmp     done
    
    exchange:
            mov     [esi],ebx
            mov     [esi+4],ecx
    
    done:
            mov     byte ptr [edi],0
    

    这里是关于无锁与锁实现的讨论: http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html

    【讨论】:

    • 你会在哪里放置 PAUSE (NOP:REP) 指令?在循环内部还是之前?
    • 代替“等待”部分的“暂停”。但据我所知,对于较旧的 cpu
    • 此实现在尝试交换之前基于lock cmpxchg 而不是原子负载。 (参见大墓地的回答)。 pause 避免了当您旋转负载而不是 locked 指令时的内存排序错误推测。见my answer on another question for a simple/minimal asm spinlock implementation that I think avoids any obvious problems
    • 这也适用于 64 位吗?为什么只说 x86 spinlock?
    • @Goaler444 x86 表示通用架构名称,也应该适用于 x86_64,使用 cmpxchgq 而不是 cmpxchgl 的 64 位 lock_addr
    【解决方案5】:

    只是问:

    在深入研究自旋锁和几乎无锁的数据结构之前:

    您是否 - 在您的基准测试和应用程序中 - 确保保证竞争线程在不同的内核上运行?

    如果不是这样,您最终可能会得到一个在您的开发机器上运行良好但在该领域很糟糕/失败的程序,因为一个线程必须同时是自旋锁的锁定器和解锁器。

    给你一个数字:在 Windows 上,你有 10 毫秒的标准时间片。如果您不确定锁定/解锁涉及两个物理线程,那么您最终将每秒锁定/解锁大约 500 次,这个结果将非常meh

    【讨论】:

    • 当然,一个线程必须锁定/解锁自旋锁。否则它不会保护任何东西。而且没有理由让线程获取锁,取消调度,然后为每个时间片解锁。
    猜你喜欢
    • 1970-01-01
    • 2014-11-15
    • 2014-05-21
    • 2013-01-01
    • 2013-09-27
    • 2010-09-29
    • 2016-08-12
    • 1970-01-01
    • 2021-11-28
    相关资源
    最近更新 更多