【问题标题】:Implementing a FIFO mutex in pthreads在 pthread 中实现 FIFO 互斥锁
【发布时间】:2024-01-02 00:50:01
【问题描述】:

我正在尝试实现支持并发插入的二叉树(即使在节点之间也可能发生),但不必为每个节点分配全局锁或单独的互斥锁或互斥锁。相反,分配的此类锁的数量应该是使用树的线程数量的顺序。

因此,我最终遇到了一种lock convoy 问题。更简单地说,当两个或更多线程执行以下操作时,可能会发生这种情况:

1 for(;;) { 2 锁(互斥锁) 3 do_stuff 4 解锁(互斥) 5 }

也就是说,如果 Thread#1 在一次“cpu burst”中执行指令 4->5->1->2,那么 Thread#2 将无法执行。

另一方面,如果在 pthread 中为 mutexes 提供 FIFO 类型的锁定选项,则可以避免此类问题。那么,有没有办法在 pthreads 中实现 FIFO 类型的互斥锁?改变线程优先级可以做到这一点吗?

【问题讨论】:

  • 解锁后投个sched_yield()怎么样?至少,最近拥有锁的那个会释放 CPU,让其他人有机会获得锁。如果没有,那么线程将被安排再次运行。

标签: c pthreads mutex


【解决方案1】:

您可以实现一个公平排队系统,其中每个线程在阻塞时都被添加到队列中,并且队列中的第一个线程总是在可用时获取资源。这种基于 pthread 原语构建的“公平”票证锁可能如下所示:

#include <pthread.h>

typedef struct ticket_lock {
    pthread_cond_t cond;
    pthread_mutex_t mutex;
    unsigned long queue_head, queue_tail;
} ticket_lock_t;

#define TICKET_LOCK_INITIALIZER { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER }

void ticket_lock(ticket_lock_t *ticket)
{
    unsigned long queue_me;

    pthread_mutex_lock(&ticket->mutex);
    queue_me = ticket->queue_tail++;
    while (queue_me != ticket->queue_head)
    {
        pthread_cond_wait(&ticket->cond, &ticket->mutex);
    }
    pthread_mutex_unlock(&ticket->mutex);
}

void ticket_unlock(ticket_lock_t *ticket)
{
    pthread_mutex_lock(&ticket->mutex);
    ticket->queue_head++;
    pthread_cond_broadcast(&ticket->cond);
    pthread_mutex_unlock(&ticket->mutex);
}

【讨论】:

  • 不幸的是,问题出在互斥体本身上。正如 Logan Capaldo 所提到的,构建更复杂的锁只会“只会降低所描述的问题的可能性,而不会真正阻止它”。当然,我可以实现条件变量或票证锁定类型“等待队列”,但这不会阻止问题。您的票证锁定基于相同的“pthread mutex”,其争用可能导致问题中描述的线程饥饿。
  • @ManRow:再看一遍。在这种方案下,同一个线程只有在没有其他线程竞争的情况下才能立即重新获取锁。这确保了饥饿不会发生——因为一旦线程调用ticket_lock(),它就保证在当前持有者重新获得锁之前获得锁。
  • @caf:一点也不。当前持有者完全有能力在同一个 cpu 突发中释放锁重新获取它(参见原始问题中的代码!!)。为了调用“ticket_lock”,任何线程必须首先获取互斥体。为什么当前持有者不能连续执行此操作并将整个队列留给自己?另一个线程甚至可以进入锁的保证在哪里?毕竟,解锁互斥锁只会让等待它的线程“准备好”,而不是“运行”!
  • @ManRow:如果您担心线程 A 在 ticket_lock() 中的第一个 pthread_mutex_lock() 处挂起,而线程 B 多次运行 ticket_lock(); do_work(); ticket_unlock(); 直到它的时间片用完了,这与线程 A 在它调用 ticket_lock() 之前被暂停的点无法区分,而线程 B 会用完它的时间片。在后一种情况下,线程 B 甚至还没有注册它对锁的兴趣。
  • @caf:但是你无法告诉在哪里线程 B 完成了它的时间片!如果线程 B 一直在锁的关键部分完成,那么无论如何它都会让线程 A 饿死。您必须确保 very mutex 本身 以 FIFO 顺序锁定,这是我的问题所要求的。
【解决方案2】:

你可以这样做:

  • 定义一个“排队锁”,它由一个空闲/忙碌标志加上一个 pthread 条件变量的链表组成。对 queued_lock 的访问受到互斥锁的保护

  • 锁定 queued_lock:

    • 抓住互斥锁
    • 检查“忙碌”标志
    • 如果不忙;设置忙碌=真;释放互斥锁;完成
    • 如果忙;在队列末尾创建一个新条件并等待它(释放互斥锁)
  • 解锁:

    • 抓住互斥锁
    • 如果没有其他线程排队,busy = false;释放互斥锁;完成
    • pthread_cond_signal 第一个等待条件
    • 清除“忙碌”标志 - 所有权正在传递给其他线程
    • 释放互斥锁
  • 当等待线程被 pthread_cond_signal 解除阻塞时:

    • 从队列头部移除我们的条件变量
    • 释放互斥锁

请注意,互斥锁仅在 queued_lock 的状态被更改时被锁定,而不是在 queued_lock 被持有的整个持续时间内。

【讨论】:

  • 这不只是降低了所描述的问题的可能性并且实际上并没有阻止它吗?我认为您的方法几乎就在那里,如果我们要添加一个额外的 condvar,我们在 unlock 中等待标志再次变得忙碌,如果互斥锁被争用(condvars 列表非空)。通过这种方式,我们保证在我们有能力再次尝试获取它之前,我们已经将锁交给了其他人。
  • 不。我也有同样的问题。即使互斥锁 没有 争用,您也必须等待那个额外的 condvar,否则您仍然会以一个线程太快而结束。然后你有一个问题,你如何停止线程。我想你可以有一个不等待的解锁和一个 fair_unlock,当你要跳出循环时调用 unlock。
  • 其实解锁可能是处理这个错误的地方。解锁后你可能有有趣的工作要做,你没有理由站在那里让别人拿锁。互斥锁应该跟踪最后持有锁的线程。如果是你,锁的时候,假装别人有锁,把自己加到服务员中,而不是拿走。
  • 是的,我认为跟踪谁最后持有锁的想法正朝着正确的方向前进。不过,我认为您不能将自己添加到服务员中——如果没有其他人来叫醒您,请等一下?但是,如果您在允许同一个线程重新锁定互斥锁之前执行了sched_yield 怎么办?这应该确保任何其他正在旋转的试图获取互斥锁的线程都有机会。
【解决方案3】:

您可以通过@caf 所描绘的想法获得一个公平的互斥锁,但在执行实际锁定之前使用原子操作来获取票证。

#if defined(_MSC_VER)
typedef volatile LONG Sync32_t;
#define SyncFetchAndIncrement32(V) (InterlockedIncrement(V) - 1)
#elif (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) > 40100
typedef volatile uint32_t Sync32_t;
#define SyncFetchAndIncrement32(V) __sync_fetch_and_add(V, 1)
#else
#error No atomic operations
#endif

class FairMutex {
private:
    Sync32_t                _nextTicket;
    Sync32_t                _curTicket;
    pthread_mutex_t         _mutex;
    pthread_cond_t          _cond;

public:
    inline FairMutex() : _nextTicket(0), _curTicket(0), _mutex(PTHREAD_MUTEX_INITIALIZER), _cond(PTHREAD_COND_INITIALIZER)
    {
    }
    inline ~FairMutex()
    {
        pthread_cond_destroy(&_cond);
        pthread_mutex_destroy(&_mutex);
    }
    inline void lock()
    {
        unsigned long myTicket = SyncFetchAndIncrement32(&_nextTicket);
        pthread_mutex_lock(&_mutex);
        while (_curTicket != myTicket) {
            pthread_cond_wait(&_cond, &_mutex);
        }
    }
    inline void unlock()
    {
        _curTicket++;
        pthread_cond_broadcast(&_cond);
        pthread_mutex_unlock(&_mutex);
    }
};

更广泛地说,我不会将其称为 FIFO 互斥体,因为它给人的印象是维持原本不存在的顺序。如果您的线程并行调用 lock(),则它们在调用锁之前不能有顺序,因此创建互斥体保留不存在的顺序关系是没有意义的。

【讨论】:

  • gcc 大小写错误,您应该在sync_add_and_fetch 之后添加一个- 1,就像在Windows 调用中一样。
【解决方案4】:

您发布的示例没有解决方案。基本上你只有一个关键部分,没有并行的地方。

也就是说,您会发现将线程保持互斥锁的时间缩短到最低限度非常重要,只需少量指令即可。这对于插入诸如树之类的动态数据结构是很困难的。概念上最简单的解决方案是为每个树节点设置一个读写锁。

如果您不想为每个树节点设置单独的锁,您可以在树的每一层设置一个锁结构。我会为此尝试读写锁。当您遍历树时,您可以只使用手头节点级别(加上下一个级别)的读锁定。然后,当您找到合适的插入锁定该级别以进行写入时。

【讨论】:

    【解决方案5】:

    解决方案可能是使用atomic operations。没有锁定,没有上下文切换,没有睡眠,并且比互斥锁或条件变量快得多。原子操作并不是所有问题的最终解决方案,但我们已经创建了许多仅使用原子操作的通用数据结构的线程安全版本。他们非常快。

    原子操作是一系列简单的操作,如递增、递减或赋值,保证在多线程环境中原子执行。如果两个线程同时命中操作,cpu 会确保一个线程一次执行操作。原子操作是硬件指令,所以它们很快。 “比较和交换”对于线程安全的数据结构非常有用。在我们的测试中,原子比较和交换大约与 32 位整数赋值一样快。也许慢 2 倍。考虑到互斥体消耗了多少 CPU 时,原子操作的速度无限快。

    用原子操作来平衡你的树并不是一件容易的事,但也不是不可能的。我过去遇到过这个要求,并通过使线程安全skiplist 作弊,因为可以通过原子操作轻松完成跳过列表。抱歉,我不能给你一份我们的代码的副本……我的公司会解雇我,但你自己做很容易。

    可以通过简单的线程安全链表示例来可视化原子操作如何工作以生成无锁数据结构。在不使用锁的情况下将项目添加到全局链表 (_pHead)。首先保存一份_pHead, pOld。在执行并发操作时,我将这些副本视为“世界状态”。接下来创建一个新节点 pNew,并将其 pNext 设置为 COPY。然后使用原子“比较和交换”将 _pHead 更改为 pNew,仅当 pHead 仍然是 pOld 时。仅当 _pHead 未更改时,原子操作才会成功。如果失败,则循环返回以获取新 _pHead 的副本并重复。

    如果操作成功,世界其他地方现在将看到一个新的负责人。如果一个线程在一纳秒之前获得了旧的头,该线程将不会看到新项目,但列表仍然可以安全地迭代。由于我们在将新项目添加到列表之前将 pNext 预设为旧头,因此如果线程在我们添加新头后一纳秒内看到新头,则可以安全地遍历列表。

    全球的东西:

    typedef struct _TList {
      int data;
      struct _TList *pNext;
    } TList;
    
    TList *_pHead;
    

    加入列表:

    TList *pOld, *pNew;
    ...
    // allocate/fill/whatever to make pNew
    ...
    while (1) { // concurrency loop
      pOld = _pHead;  // copy the state of the world. We operate on the copy
      pNew->pNext = pOld; // chain the new node to the current head of recycled items
      if (CAS(&_pHead, pOld, pNew))  // switch head of recycled items to new node
        break; // success
    }
    

    CAS 是__sync_bool_compare_and_swap 等的简写。看看有多容易?没有互斥锁...没有锁!在极少数情况下,两个线程同时访问该代码,一个简单地循环第二次。我们只看到第二个循环,因为调度程序在并发循环中交换了一个线程。所以它很少见且无关紧要。

    可以以类似的方式将事物从链表的头部拉出。如果您使用联合,您可以原子地更改多个值,并且您可以使用最多 128 位的原子操作。我们在 32 位 redhat linux 上测试了 128 位,它们的速度与 32、64 位原子操作的速度相同。

    您必须弄清楚如何将这种技术用于您的树。一个 b 树节点将有两个指向子节点的 ptr。您可以对它们进行 CAS 更改。平衡问题很棘手。我可以看到您如何在添加某些内容并从某个点复制分支之前分析树枝。当您完成更改分支时,您将新分支放入。这对于大型分支来说将是一个问题。当线程没有争夺树时,也许可以“稍后”完成平衡。也许你可以做到这一点,即使你没有一直级联旋转,树仍然是可搜索的……换句话说,如果线程 A 添加了一个节点并且递归地旋转节点,线程 b 仍然可以读取或添加节点。只是一些想法。在某些情况下,我们会在 pNext 的 32 位之后的 32 位中创建一个具有版本号或锁定标志的结构。然后我们使用 64 位 CAS。也许您可以使树在任何时候都可以安全地读取而无需锁定,但您可能必须在正在修改的分支上使用版本控制技术。

    以下是我发表的一些关于原子操作优势的帖子:

    Pthreads and mutexes; locking part of an array

    Efficient and fast way for thread argument

    Configuration auto reloading with pthreads

    Advantages of using condition variables over mutex

    single bit manipulation

    Is memory allocation in linux non-blocking?

    【讨论】:

      【解决方案6】:

      您可以看看pthread_mutexattr_setprioceiling 函数。

      int pthread_mutexattr_setprioceiling
      (
          pthread_mutexatt_t * attr, 
          int prioceiling,
          int * oldceiling
      );
      

      来自文档:

      pthread_mutexattr_setprioceiling(3THR) 设置互斥属性对象的优先级上限属性。

      attr 指向由先前调用 pthread_mutexattr_init() 创建的互斥属性对象。

      prioceiling 指定已初始化互斥锁的优先级上限。上限定义了执行互斥锁保护的临界区的最低优先级。 prioceiling 将在 SCHED_FIFO 定义的最大优先级范围内。为避免优先级倒置,prioceiling 将设置为高于或等于所有可能锁定特定互斥锁的线程的最高优先级。

      oldceiling 包含旧的优先级上限。

      【讨论】:

      • 这并没有解决实际提出的问题。避免优先级倒置与避免饥饿是不同的问题。
      最近更新 更多