【问题标题】:C & low-level semaphore implementationC & 低级信号量实现
【发布时间】:2016-07-05 18:46:34
【问题描述】:

我在考虑如何使用尽可能少的 asm 代码来实现信号量(不是二进制)。
在不使用互斥锁的情况下,我无法成功地思考和编写它,所以这是迄今为止我能做的最好的事情:

全球:

#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>

typedef struct
{
    atomic_ullong    value;
    pthread_mutex_t *lock_op;
    bool             ready;
} semaphore_t;

typedef struct
{
    atomic_ullong   value;
    pthread_mutex_t lock_op;
    bool            ready;
} static_semaphore_t;

 /* use with static_semaphore_t */
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true}


功能:

bool semaphore_init(semaphore_t *semaphore, unsigned long long init_value)
{   
    if(semaphore->ready) if(!(semaphore->lock_op = \
                             calloc(1, sizeof(pthread_mutex_t)))) return false;
    else                 pthread_mutex_destroy(semaphore->lock_op);   

    if(pthread_mutex_init(semaphore->lock_op, NULL))
            return false;

    semaphore->value = init_value;
    semaphore->ready = true;
    return true;
}

bool semaphore_wait(semaphore_t *semaphore)
{
    if(!semaphore->ready) return false;

    pthread_mutex_lock(&(semaphore->lock_op));
    while(!semaphore->value) __asm__ __volatile__("nop");
    (semaphore->value)--;
    pthread_mutex_unlock(&(semaphore->lock_op));
    return true;
}

bool semaphore_post(semaphore_t *semaphore)
{
    if(!semaphore->ready) return false;

    atomic_fetch_add(&(semaphore->value), (unsigned long long) 1);
    return true;
}


是否可以仅使用几行、使用原子内置函数或直接在汇编中实现信号量(例如lock cmpxchg)?

查看&lt;semaphore.h&gt; 包含的&lt;bits/sempahore.h&gt; 中的sem_t 结构,在我看来,它选择了一条非常不同的路径...

typedef union
{
    char __size[__SIZEOF_SEM_T];
    long int __align;
} sem_t;




更新:

@PeterCordes 提出了一个绝对更好的解决方案,使用原子,没有互斥体,直接对信号量值进行检查。

我仍然想更好地了解在性能方面改进代码的机会,利用内置暂停函数或内核调用避免 CPU 浪费,等待关键资源可用。

如果有一个标准的互斥锁和非二进制信号量的实现来进行比较也很好。
来自futex(7) 我读到:“Linux 内核提供 futexes(“快速用户空间互斥锁”)作为快速用户空间锁定和信号量的构建块。Futexes 非常基础,非常适合构建更高级别的锁定抽象,例如互斥锁、条件变量、读写锁、屏障和信号量。”

【问题讨论】:

  • 我看不出你在哪里包含stdatomics。不清楚你想完成什么。
  • 您当然可以使用原子操作来进行值比较。但是您仍然需要内核支持来暂停和唤醒进程。除非你做一个繁忙的循环,这不是一个可行的通用解决方案。
  • @Olaf 我想在代码大小和速度方面找到最有效的方法来自己实现非二进制信号量。
  • @kaylum 我想我不能只用一个比较操作来做到这一点,因为当value &lt; nthreads 时 n 个线程可以通过非空检查并使值变为负数,所以,正如你所说,我需要自动检查和递减,为此我使用了互斥锁。至于忙循环/等待,我虽然是 mutex_lock 所做的。您是说没有更有效和更优雅的方法来实现这一点?
  • @JumpAlways "至于忙循环/等待我虽然是 mutex_lock 所做的," 绝对不是!现实世界的低级同步原语是由具有深厚平台知识的人编写的。有十几种方法可以使大多数程序员甚至不知道存在这种错误。有关类似问题的更多信息,请参阅here。你可以天真地生产一个玩具,但不要自欺欺人地认为你会得到好的表现。并且不要认为您可以轻松编写一个基准来区分好坏,因为这也很难。

标签: c multithreading assembly mutex semaphore


【解决方案1】:

请参阅我的最小天真信号量实现,它可能有效。它编译并看起来适合 x86。我认为这对于任何 C11 实现都是正确的。


IIRC,可以通过原子操作访问implement a counting lock (aka semaphore) with just a single integer。该*链接甚至给出了up/down 的算法。您不需要单独的互斥锁。如果atomic_ullong 需要一个互斥体来支持目标 CPU 上的原子递增/递减,它将包括一个。 (这可能是 32 位 x86 上的情况,或者实现使用慢速 cmpxchg8 而不是快速 lock xadd。对于您的信号量来说,32 位计数器真的太小了吗?因为 64 位原子在 32 位机器上会更慢。)

&lt;bits/sempahore.h&gt; 联合定义显然只是一个大小正确的不透明 POD 类型,并不表示实际实现。


正如@David Schwartz 所说,除非您是专家,否则为实际使用实现自己的锁定是一件愚蠢的事情。不过,这可能是一种有趣的方式来了解原子操作并找出标准实现中的底层内容。请仔细注意他的警告,即锁定实现很难测试。您可以使用当前版本的编译器和您选择的编译选项编写适用于您硬件上的测试用例的代码...


ready 布尔值完全浪费空间。如果您可以正确初始化 ready 标志以便函数查看它是有意义的,那么您可以将其他字段初始化为合理的初始状态。

我注意到您的代码还有一些其他问题:

#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true};

static_semaphore_t my_lock = SEMAPHORE_INITIALIZER(1);
// expands to my_lock = = {1, PTHREAD_MUTEX_INITIALIZER, true};;
// you should leave out the = and ; in the macro def so it works like a value

使用动态分配的pthread_mutex_t *lock_op 是愚蠢的。使用值,而不是指针。您的大多数锁定功能都使用互斥锁,因此额外的间接级别只会减慢速度。内存与计数器一起存在会更好。互斥体不需要太多空间。


while(!semaphore->value) __asm__ __volatile__("nop");

我们希望此循环避免浪费功率并减慢其他线程甚至其他逻辑线程与超线程共享同一内核的速度。

nop 不会减少忙等待循环的 CPU 密集度。即使使用超线程,它在 x86 上也可能没有区别,因为整个循环体仍然可能适合 4 微指令,因此每个时钟一次迭代都会出现问题,无论那里是否有 nopnop 不需要执行单元,所以至少它不会受到伤害。这种自旋循环发生在持有互斥体的情况下,这看起来很愚蠢。所以第一个服务员会进入这个自旋循环,而之后的服务员会在互斥体上旋转。


这是我对信号量的简单实现,仅使用 C11 原子操作

我认为这是一个很好的实现,它实现了非常有限的正确和小目标(源代码和机器代码),并且不使用其他实际的锁定原语。有些主要领域我什至没有尝试解决(例如公平/饥饿,将 CPU 让给其他线程,可能还有其他东西)。

查看asm output on godboltdown 仅 12 个 x86 insn,up 仅 2 个(包括 rets)。 Godbolt 的非 x86 编译器(ARM/ARM64/PPC 的 gcc 4.8)太旧,无法支持 C11 &lt;stdatomic.h&gt;。 (不过,他们确实有 C++ std::atomic)。所以很遗憾,我无法轻松检查非 x86 上的 asm 输出。

#include <stdatomic.h>
#define likely(x)       __builtin_expect(!!(x), 1)
#define unlikely(x)     __builtin_expect(!!(x), 0)

typedef struct {
  atomic_int val;   // int is plenty big.  Must be signed, so an extra decrement doesn't make 0 wrap to >= 1
} naive_sem_t;

#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
static inline void spinloop_body(void) { _mm_pause(); }  // PAUSE is "rep nop" in asm output
#else
static inline void spinloop_body(void) { }
#endif

void sem_down(naive_sem_t *sem)
{
  while (1) {
    while (likely(atomic_load_explicit(&(sem->val), memory_order_acquire ) < 1))
        spinloop_body();  // wait for a the semaphore to be available
    int tmp = atomic_fetch_add_explicit( &(sem->val), -1, memory_order_acq_rel );  // try to take the lock.  Might only need mo_acquire
    if (likely(tmp >= 1))
        break;              // we successfully got the lock
    else                    // undo our attempt; another thread's decrement happened first
        atomic_fetch_add_explicit( &(sem->val), 1, memory_order_release ); // could be "relaxed", but we're still busy-waiting and want other thread to see this ASAP
  }
}
// note the release, not seq_cst.  Use a stronger ordering parameter if you want it to be a full barrier.
void sem_up(naive_sem_t *sem) {
    atomic_fetch_add_explicit(&(sem->val), 1, memory_order_release);
}

这里的窍门是val暂时过低是可以的;这只会使其他线程旋转。另请注意,fetch_add 是单个原子操作是关键。它返回旧值,因此我们可以检测到 val 在 while 循环的加载和 fetch_add 之间何时被另一个线程占用。 (请注意,我们不需要检查 tmp 是否 == 到 while 循环的负载:如果另一个线程 uped 负载和 fetch_add 之间的信号量很好。这是使用 fetch_add 而不是cmpxchg)。

atomic_load 自旋循环只是对让所有服务员在val 上执行原子读取-修改-写入的性能优化。 (尽管许多服务员尝试使用 inc 进行 dec 然后撤消,但服务员看到锁被解锁的情况非常罕见)。

真正的实现会为更多平台提供特殊的东西,而不仅仅是 x86。对于 x86,可能不仅仅是自旋循环内的 PAUSE 指令。这仍然只是一个完全可移植的 C11 实现的玩具示例。 PAUSE 显然有助于避免对内存排序的错误推测,因此 CPU 在离开自旋循环后运行效率更高。 pause 与将逻辑 CPU 让给操作系统以运行不同的线程不同。也与memory_order_???参数的正确性和选择无关。

一个真正的实现可能会在经过一些旋转迭代(sched_yield(2),或者更可能是futex 系统调用,见下文)后将 CPU 交给操作系统。也许使用 x86 MONITOR / MWAIT 对超线程更加友好;我不知道。我从来没有真正实现过锁定自己,我只是在查找其他 insn 时在 x86 insn 参考中看到所有这些东西。


如前所述,x86 的lock xadd 指令实现了fetch_add(具有顺序一致性语义,因为locked 指令始终是一个完整的内存屏障)。在非 x86 上,仅对 fetch_add 使用获取+释放语义,而不是完全顺序一致性可能允许更高效的代码。我不确定,但仅使用 acquire 很可能会在 ARM64 上实现更高效的代码。我认为我们只需要acquire on the fetch_add, not acq_rel,但我不确定。在 x86 上,代码不会有任何差异,因为 locked 指令是执行原子读取-修改-写入的唯一方法,所以即使 relaxed 将与 seq_cst 相同(compile-time reordering 除外.)


如果你想让出 CPU 而不是旋转,你需要一个系统调用(正如人们所说的那样)。显然,在使标准库锁定在 Linux 上尽可能高效方面已经做了很多工作。有专门的系统调用来帮助内核在释放锁时唤醒正确的线程,而且它们易于使用。 From futex(7):

注意事项
重申一下,裸 futex 并不是最终用户易于使用的抽象。 (这个没有包装函数 系统调用 在 glibc 中。)实现者应具备汇编知识并阅读 futex 用户空间库的源代码 参考如下。


公平/饥饿(我的幼稚实现忽略了)

正如*文章所提到的,某种唤醒队列是个好主意,所以同一个线程不会每次都得到信号量。 (释放后快速获取锁的代码通常会让释放线程在其他线程仍处于休眠状态时获得锁)。

这是进程中内核协作的另一个主要好处 (futex)。

【讨论】:

  • 来自您提供的链接:"如果实现不能确保递增、递减和比较操作的原子性,则存在递增或递减被遗忘或信号量的风险值变为负值。原子性可以通过使用能够在单个操作中读取、修改和写入信号量的机器指令来实现。在没有这样的硬件指令的情况下,可以通过使用软件来合成原子操作互斥算法。”
  • “在单处理器系统上,原子操作可以通过暂时挂起抢占或禁用硬件中断来确保。这种方法在多处理器系统上不起作用,因为在多处理器系统上可能有两个程序共享一个信号量运行同时在不同的处理器上。为了在多处理器系统中解决这个问题,可以使用锁定变量来控制对信号量的访问。使用 test-and-set-lock 命令来操作锁定变量。"
  • 您需要的不仅仅是原子减法,因为如果在 dec 操作之前,比信号量值更多的线程可以通过 not null 测试,然后全部递减 sem 值,使其变为负数,而如果你在减量之后进行测试,sem 值可以变为负数(
  • 我不得不说我不想使用自己的实现,但我想深入了解这个话题,所以挑战是找出如何更有效地实现自己的信号量尽可能少的代码,也许能够将差异与实际的标准实现进行比较。
  • 关于性能,我仍然不明白为什么在 inf 循环中使用快速操作需要更少的 CPU,我的意思是暂停线程并以某种方式等待中断,但循环一些代码不管它是什么对我来说都是一样的,除非你添加一个智能时间睡眠。