【问题标题】:Controlling the order of dequeing in the Semaphore Queue in Linux在 Linux 中控制信号量队列中的出队顺序
【发布时间】:2013-11-16 08:46:17
【问题描述】:

我想在我想为不同线程分配几个“优先级数字”的地方实现代码。一些线程可能在同一个信号量上等待。假设线程在信号量 S 上排队,另一个线程在信号量 S 上执行 sem_post。一旦执行 sem_post,我希望在信号量 S 队列中具有最高“优先级数”的线程能够访问信号量而不是任何其他线程。据我所知,没有直接的方法可以实现这一点,因为选择访问的线程可以是队列中的任何一个元素(不一定是 FIFO 等)。事实上,我尝试增加线程的 pthread 优先级,但我意识到它也不起作用。有人可以指导我如何在C中实现这种手动控制信号量队列的设计。提前谢谢你。

【问题讨论】:

    标签: linux multithreading pthreads semaphore


    【解决方案1】:

    我可以想到两种方法:

    • 使用condition variable 来“唤醒部分或全部服务员”,他们将自行整理优先释放;或
    • 使用(实时)信号按优先顺序“唤醒单个特定服务员”

    在每种情况下,信号量至少有一个mutex、一个和一些簿记。如果 value 小于零,那么它的绝对值就是等待者的数量(例如,value == -3 表示有 3 个线程在等待)。

    条件变量法

    信号量跟踪任何给定优先级的服务员数量,以及任何给定优先级释放的服务员数量。在伪 C 中:

    typedef struct priority_sem_s {
      int              value;     // if negative, abs(sem->value) == no. of waiting threads
      pthread_mutex_t  mutex;
      pthread_cond_t   cv;
      int              n_waiting[N_PRIORITIES];  // no. waiting (blocked) at each priority
      int              n_released[N_PRIORITIES]; // no. waiters released (unblocked) at each priority
    } priosem_t;
    
    void post(priosem_t *sem):
      lock(sem->mutex);
      sem->value++;
    
      if (sem->value <= 0 && prio_waiting_is_NOT_empty(sem)):
        // someone was waiting; release one of the highest prio
        int prio = fetch_highest_prio_waiting(sem);
        sem->prio_waiting[prio]--;
        sem->prio_released[prio]++;
        cond_broadcast(sem->cv, sem->mutex);
    
      unlock(sem->mutex);
    
    void wait(priosem_t *sem, int prio):
      lock(sem->mutex);
      sem->value--;
    
      if (sem->value < 0):
        // get in line
        sem->prio_waiting[prio]++;
        while (sem->prio_released[prio] < 0):
          cond_wait(sem->cv, sem->mutex);
        // ok to leave
        sem->prio_released[prio]--;
    
      unlock(sem->mutex);
    

    优点:可以跨进程共享(在共享内存中实现)。

    缺点:唤醒每个服务员只放一个。 Martin James 建议每个优先级使用一个条件变量,这会以更多同步原语为代价来减少“不必要的”唤醒。

    信号方法

    使用sigsuspendrealtime signal 和noop 处理程序来暂停和恢复服务员。在伪 C 中:

    typedef struct priority_sem_s {
      int              value;    // if negative, abs(value) == no. of waiting threads
      pthread_mutex_t  mutex;
      void            *waiting;  // ordered list of [priority, thread-id] pairs
    } priosem_t;
    
    void post(priosem_t *sem):
      lock(sem->mutex);
      sem->value++;
    
      if (sem->value <= 0 && waiting_queue_is_NOT_empty(sem)):
        pthread_t tid = pop_highest_prio_waiter(sem);
        pthread_kill(tid, SIGRTMIN+n);
    
      unlock(sem->mutex);
    
    void wait(priosem_t *sem, int prio):
      // XXX --> PRECONDITION:  SIGRTMIN+n is SIG_BLOCK'd <-- XXX
      // XXX --> PRECONDITION:  SIGRTMIN+n has a no-op handler installed <-- XXX
      lock(sem->mutex);
      sem->value--;
    
      if (sem->value < 0):
        // get in line
        add_me_to_wait_list(sem, pthread_self(), prio);
        unlock(sem->mutex);
        sigsuspend(full_mask_except_sigrtmin_plus_n);
        return;  // OK!
    
      unlock(sem->mutex);
    

    优点:概念上更简单;没有不必要的唤醒。

    缺点:不能跨进程共享。必须选择或动态选择可用的实时信号(寻找具有 SIG_DFL 配置的未屏蔽信号?)并尽早屏蔽。

    【讨论】:

    • +1 表示实际代码,(尽管我认为这是非常错误的......'勇敢':)
    • @MartinJames,你可以说“傻瓜”:)
    【解决方案2】:

    我认为您必须使用 post() 和 wait(priority) 方法构建自己的“PrioritySemaphore”(PS)类。您需要一个互斥锁来保护内部数据、一个“totalCount”整数和一个结构数组[priority],其中包含一个信号量供线程等待和一个“PriorityCount”整数。

    wait(priority):锁定互斥体。如果totalCount>0,dec它,解锁互斥体并返回。如果 totalCount=0,使用(priority)索引数组,包括 PriorityCount,解锁互斥锁并等待信号量。

    post():锁定互斥体。如果 totalCount=0, inc 它,解锁互斥锁并返回。如果 totalCount>0,从最高优先级的一端迭代数组,寻找非零的 PriorityCount。如果没有找到,inc totalCount,解锁互斥锁并返回。如果找到非零的 PriorityCount,则对其进行解码,以该优先级向信号量发出信号,解锁互斥体并返回。

    【讨论】:

    • IIUC,你在上面说“信号量”,我认为在 pthreads 中我们会说“条件变量”。我建议为每个优先级插槽设置一个“PendingReleaseCount”——如果找到一个服务员,则在 post() 时增加,并在 被唤醒后在 wait() 中减少——如果多个服务员可以达到相同的优先级.否则,您将面临一场竞赛,其中来自新 post() 的信号可能会“丢失”。
    • 你能否解释得更详细一点。我觉得上面提出的解决方案在等待功能中存在竞争条件。该函数释放互斥体,然后等待信号量。此外,假设一个线程首先出现并获取一个 PrioritySemaphore(初始值为 1,使其成为互斥体),如上实现的,然后假设另一个线程在该信号量上排队。当前者退出临界区并调用“post”函数时,totalcount=0,因此信号量被解锁,但后者线程没有获得控制权。我们需要再调用一次“发布”来启动后者。
    • @wang '函数释放互斥体,然后等待信号量'。当然。如果它在释放互斥锁之前等待信号量,则该类将死锁,因为互斥锁永远不会被释放。还在考虑你的另一点:)
    • @wang - 第二点 - 你想要一种在创建时初始化 PS 的方法,是吗?我必须承认我只考虑了初始计数为 0 的 PS。
    【解决方案3】:

    我必须开发一个具有以下特征的信号量结构:

    1. 有一个关键部分,最多Capacity 个线程可以同时进入和执行。执行后线程退出临界区;
    2. 当信号量达到最大容量,执行队列被填满时:队列中的线程进入休眠状态,当其他线程退出临界区时被唤醒;
    3. 执行队列具有 FIFO 语义;
    4. 有一种通知机制可以通知等待线程它们在队列中的位置;
    5. 只允许进入临界区的线程退出。

    第 1-2 点通常描述理论上的 semaphore 数据类型,而第 3-4 点则要求其他行为/API 约束和功能。毫不奇怪,这种结构可以仅使用 mutex条件变量 原语来构建,即使信号量本身经常被误认为是同步原语。它遵循 C++11 实现,也可以移植到提供上述原语的任何语言/环境。由于通知机制要求不要使信号量锁保持忙碌,因此该实现并非完全微不足道。自定义优先级和优先级编辑尚未实现,因为我不需要类似调度程序的功能,但它们应该也是可能的。

    Semaphore.h

    #pragma once
    
    #include <condition_variable>
    #include <mutex>
    #include <thread>
    #include <functional>
    #include <list>
    
    namespace usr
    {
        typedef std::function<void(unsigned processIndex)> SemaphoreNotifier;
    
        class Semaphore;
    
        class SemaphoreToken final
        {
            friend class Semaphore;
        public:
            SemaphoreToken();
        private:
            SemaphoreToken(Semaphore &semaphore);
        private:
            void Invalidate();
        private:
            Semaphore *Parent;
            std::thread::id ThreadId;
        };
    
        class SemaphoreCounter final
        {
            friend class Semaphore;
        public:
            SemaphoreCounter();
        private:
            void Increment();
        public:
            unsigned GetCount() const { return m_count; }
        private:
            unsigned m_count;
        };
    
        class Semaphore final
        {
            class Process
            {
            public:
                Process(unsigned index);
            public:
                void Wait();
                void Set();
                void Decrement();
                void Detach();
            public:
                bool IsDetached() const { return m_detached; }
                unsigned GetIndex() const { return m_index; }
            private:
                std::mutex m_mutex;
                unsigned m_index;                   // Guarded by m_mutex
                bool m_detached;                    // Guarded by m_mutex
                std::unique_lock<std::mutex> m_lock;
                std::condition_variable m_cond;
            };
        public:
            Semaphore(unsigned capacity = 1);
        public:
            SemaphoreToken Enter();
            SemaphoreToken Enter(SemaphoreCounter &counter, unsigned &id);
            SemaphoreToken Enter(const SemaphoreNotifier &notifier);
            SemaphoreToken Enter(const SemaphoreNotifier &notifier, SemaphoreCounter &counter, unsigned &id);
            bool TryEnter(SemaphoreToken &token);
            bool TryEnter(SemaphoreCounter &counter, unsigned &id, SemaphoreToken &token);
            void Exit(SemaphoreToken &token);
        private:
            bool enter(bool tryEnter, const SemaphoreNotifier &notifier, SemaphoreCounter *counter, unsigned &id, SemaphoreToken &token);
        private:
            // Disable copy constructor and assign operator
            Semaphore(const Semaphore &);
            Semaphore & operator=(const Semaphore &);
        public:
            unsigned GetCapacity() const { return m_capacity; }
        private:
            mutable std::mutex m_mutex;
            unsigned m_capacity;
            unsigned m_leftCapacity;               // Guarded by m_mutex
            std::list<Process *> m_processes;      // Guarded by m_mutex
        };
    }
    

    信号量.cpp

    #include "Semaphore.h"
    #include <cassert>
    #include <limits>
    #include <algorithm>
    
    using namespace std;
    using namespace usr;
    
    Semaphore::Semaphore(unsigned capacity)
    {
        if (capacity == 0)
            throw runtime_error("Capacity must not be zero");
    
        m_capacity = capacity;
        m_leftCapacity = capacity;
    }
    
    SemaphoreToken Semaphore::Enter()
    {
        unsigned id;
        SemaphoreToken token;
        enter(false, nullptr, nullptr, id, token);
        return token;
    }
    
    SemaphoreToken Semaphore::Enter(SemaphoreCounter &counter, unsigned &id)
    {
        SemaphoreToken token;
        enter(false, nullptr, &counter, id, token);
        return token;
    }
    
    SemaphoreToken Semaphore::Enter(const SemaphoreNotifier &notifier)
    {
        unsigned id;
        SemaphoreToken token;
        enter(false, notifier, nullptr, id, token);
        return token;
    }
    
    SemaphoreToken Semaphore::Enter(const SemaphoreNotifier &notifier,
        SemaphoreCounter &counter, unsigned &id)
    {
        SemaphoreToken token;
        enter(false, notifier, &counter, id, token);
        return token;
    }
    
    bool Semaphore::TryEnter(SemaphoreToken &token)
    {
        unsigned id;
        return enter(true, nullptr, nullptr, id, token);
    }
    
    bool Semaphore::TryEnter(SemaphoreCounter &counter, unsigned &id, SemaphoreToken &token)
    {
        return enter(true, nullptr, &counter, id, token);
    }
    
    bool Semaphore::enter(bool tryEnter, const SemaphoreNotifier &notifier,
        SemaphoreCounter *counter, unsigned &id, SemaphoreToken &token)
    {
        unique_lock<mutex> lock(m_mutex);
        if (counter != nullptr)
        {
            id = counter->GetCount();
            counter->Increment();
        }
    
        if (m_leftCapacity > 0)
        {
            // Semaphore is availabile without accessing queue
            assert(m_processes.size() == 0);
            m_leftCapacity--;
        }
        else
        {
            if (tryEnter)
                return false;
    
            Process process((unsigned)m_processes.size());
            unsigned previousIndex = numeric_limits<unsigned>::max();
            m_processes.push_back(&process);
    
            // Release semaphore unlock
            lock.unlock();
    
        NotifyAndWait:
            unsigned index = process.GetIndex();
            if (notifier != nullptr && index != 0 && index != previousIndex)
            {
                try
                {
                    // Notify the caller on progress
                    notifier(index);
                }
                catch (...)
                {
                    // Retake Semaphore lock
                    lock.lock();
    
                    // Remove the failing process
                    auto found = std::find(m_processes.begin(), m_processes.end(), &process);
                    auto it = m_processes.erase(found);
                    for (; it != m_processes.end(); it++)
                    {
                        // Decrement following processes
                        auto &otherProcess = **it;
                        otherProcess.Decrement();
                        otherProcess.Set();
                    }
    
                    // Rethrow. NOTE: lock will be unlocked by RAII
                    throw;
                }
                previousIndex = index;
            }
    
            process.Wait();
            if (!process.IsDetached())
                goto NotifyAndWait;
        }
    
        token = SemaphoreToken(*this);
        return true;
    }
    
    void Semaphore::Exit(SemaphoreToken &token)
    {
        if (this != token.Parent || token.ThreadId != this_thread::get_id())
            throw runtime_error("Exit called from wrong semaphore or thread");
    
        {
            unique_lock<mutex> lock(m_mutex);
            if (m_processes.size() == 0)
            {
                m_leftCapacity++;
            }
            else
            {
                auto front = m_processes.front();
                m_processes.pop_front();
                front->Detach();
                front->Set();
    
                for (auto process : m_processes)
                {
                    process->Decrement();
                    process->Set();
                }
            }
    
            token.Invalidate();
        }
    }
    
    SemaphoreToken::SemaphoreToken() :
        Parent(nullptr)
    {
    }
    
    SemaphoreToken::SemaphoreToken(usr::Semaphore &semaphore) :
        Parent(&semaphore),
        ThreadId(this_thread::get_id())
    {
    }
    
    void SemaphoreToken::Invalidate()
    {
        Parent = nullptr;
        ThreadId = thread::id();
    }
    
    SemaphoreCounter::SemaphoreCounter()
        : m_count(0)
    {
    }
    
    void SemaphoreCounter::Increment()
    {
        m_count++;
    }
    
    Semaphore::Process::Process(unsigned index) :
        m_index(index),
        m_detached(false),
        m_lock(m_mutex)
    {
    }
    
    void Semaphore::Process::Wait()
    {
        m_cond.wait(m_lock);
    }
    
    void Semaphore::Process::Set()
    {
        m_cond.notify_one();
    }
    
    void Semaphore::Process::Decrement()
    {
        unique_lock<mutex> lock(m_mutex);
        assert(m_index > 0);
        m_index--;
    }
    
    void Semaphore::Process::Detach()
    {
        unique_lock<mutex> lock(m_mutex);
        assert(m_index == 0);
        m_detached = true;
    }
    

    我使用以下示例代码对其进行了测试:

    SemaphoreCounter counter;
    Semaphore semaphore(4);  // Up to 4 threads can execute simultaneously
    
    vector<shared_ptr<thread>> threads;
    int threadCount = 300;
    for (int i = 0; i < threadCount; i++)
    {
        threads.push_back(std::make_shared<thread>([&semaphore, &counter]
        {
            unsigned threadId;
            auto token = semaphore.Enter([&threadId](unsigned index) {
                cout << "Thread " << threadId << " has " << index << " processes ahead before execution" << endl;
            }, counter, threadId);
    
            cout << "EXECUTE Thread " << threadId << endl;
            std::this_thread::sleep_for(15ms);
            semaphore.Exit(token);
        }));
    }
    
    for (int i = 0; i < threadCount; i++)
        threads[i]->join();
    

    【讨论】:

      猜你喜欢
      • 2012-10-18
      • 2010-10-15
      • 1970-01-01
      • 1970-01-01
      • 2011-07-14
      • 2013-07-05
      • 2020-03-05
      • 1970-01-01
      • 2017-04-18
      相关资源
      最近更新 更多