【问题标题】:Shared lock with two exclusive lock groups具有两个独占锁组的共享锁
【发布时间】:2020-07-06 14:41:04
【问题描述】:

我有两种方法“log”和“measure”,它们永远不应该同时执行。 所以我尝试使用“std::mutex”来执行此操作,如下所示:

void log(std::string message)
{
    mtx.lock();
    someLogFunctionality();
    mtx.unlock();
}

void measure()
{        
    mtx.lock();
    someMeasureFunctionality();
    mtx.unlock();
}

现在事实证明,也可以在不锁定的情况下并行多次调用“log”,同样适用于“measure”。 (原因:someLogFunctionality() 和 someMeasureFunctionality() 相互干扰,但同一个方法可能会被并行调用多次)

当时我查看了“std::shared_mutex”,但对我来说有两个问题:

1.) 使用 shared_mutex 我可以仅将 lock_shared 用于其中一种方法(日志或测量),但另一种方法必须使用排他锁(并且不能再次并行执行多次)

void log(std::string message)
{
    mtx.lock_shared();
    someLogFunctionality();
    mtx.unlock_shared();
}

void measure()
{        
    mtx.lock(); // This should also be shared but among another "group"
    someMeasureFunctionality();
    mtx.unlock();
}

2.) 我不能使用 C++17(我正在使用的环境中的约束)

您对我如何实现这一点有什么建议吗?

【问题讨论】:

  • 这是“单车道桥问题”的一个实例。你想要的可以使用信号量来完成。我不记得具体怎么做,所以我不会写答案,但如果你搜索那个短语,你应该会找到一些指南。
  • 顺便说一句,您应该删除 C 语言标签。 C 语言没有std::string 也没有std::shared_mutex。 C 语言没有命名空间。请酌情更新您的语言标签。

标签: c++ locking mutex semaphore


【解决方案1】:

根据 alexb 的回复,我编写了以下当前适用于我的互斥锁类(目前仅在一个简单的多线程示例应用程序中试用过)

请注意,它不受“饥饿”保护。简而言之:如果 lockLogging 被频繁调用(反之亦然),则不能确保 lockMeasure 会获得锁。

class MyMutex
{
private:
    std::atomic<int> log_executors;
    std::atomic<int> measure_executors;

    std::mutex mtx;
    std::condition_variable condition;

public:
    MyMutex() : log_executors(0), measure_executors(0) {}
    
    ~MyMutex() {}

    void lockMeasure()
    {   
        std::unique_lock<std::mutex> lock(mtx);

        while(log_executors) {
            condition.wait(lock); 
        }
        measure_executors++; 
    }
    
    void unlockMeasure()
    {   
        std::unique_lock<std::mutex> lock(mtx);

        measure_executors--; 
        if (!measure_executors)
        {
          condition.notify_all();
        }
    }
    
    void lockLogging()
    {         
        std::unique_lock<std::mutex> lock(mtx);

        while(measure_executors) {
          condition.wait(lock); 
        }
        log_executors++;
    }

    void unlockLogging()
    {         
        std::unique_lock<std::mutex> lock(mtx);

        log_executors--; 
        if (!log_executors)
        {
          condition.notify_all(); 
        }
    }

    static MyMutex& getInstance()
    {
        static MyMutex _instance;
        return _instance;
    }    
};

用法:

void measure()
{
    MyMutex::getInstance().lockMeasure();

    someMeasureFunctionality();

    MyMutex::getInstance().unlockMeasure();
}

void log()
{
    MyMutex::getInstance().lockLogging();

    someLogFunctionality();

    MyMutex::getInstance().unlockLogging();
}

【讨论】:

  • 我采用了制作parallel_locks 对象的路线,该对象生成单独的parallel_lock 对象,这些对象是实际的锁,因此可以按照正常的RAII 模式与std::unique_lock 一起使用。 coliru.stacked-crooked.com/a/8bfb052e0be10b93
  • 另请注意:如果您很狡猾,您的解锁方法实际上并不需要获取互斥锁。 condition.wait 也可以使用谓词,以避免将其置于循环中。
  • 为什么互斥锁是静态的?
  • 它是静态的,因为它在仅标头库中用作单例。而 measure/log 是可能在应用程序中多次实例化的类的方法。
【解决方案2】:

您需要一些比 shared_mutex 更复杂的屏障逻辑(顺便说一句,shared_mutex 不是多平台编译的最佳选择)。例如,您可以使用互斥锁、条件变量和 2 个变量进行屏障同步。它不占用 CPU,您不能使用 sleeps 进行检查。

#include <mutex>
#include <condition_variable>
#include <atomic>

std::atomic<int> log_executors = 0;
std::atomic<int> measure_executors = 0;

std::mutex mutex;
std::condition_variable condition;

void log(std::string message) {
  {
    std::unique_lock<std::mutex> lock(mutex);

    log_executors++;  // Register current executor and prevent from entering new measure executors

    // Wait until all measure executors will go away
    while(measure_executors) {
      condition.wait(lock);  // wait condition variable signal. Mutex will be unlocked during wait
    }
  }

  // here lock is freed
  someLogFunctionality(); // execute logic


  {
    std::unique_lock<std::mutex> lock(mutex);
    log_executors--;  // unregister current execution
    condition.notify_all();  // send signal and unlock all waiters
  }
}

void measure()
{        
  {
    std::unique_lock<std::mutex> lock(mutex);

    measure_executors++;  // Register current executor and prevent from entering new log executors
    while(log_executors) {
      condition.wait(lock);  // wait until all measure executors will gone
    }
  }

  someMeasureFunctionality();

  {
    std::unique_lock<std::mutex> lock(mutex);
    measure_executors--;  // unregister current execution
    condition.notify_all(); // send signal and unlock all waiters
  }
}

【讨论】:

  • stD::condition_variable 只需要一个函数指针,只是while(measure_executors) { condition.wait(lock); -> condition.wait(lock, [](){ return measure_executors; })
  • 您修改log_executorsmeasure_executors 而不持有互斥锁。虽然它们是原子的,但这并不能防止持有互斥锁的代码和访问原子的代码之间的竞争条件。 (想想看。互斥锁保护什么?)
  • 考虑:线程 A 正在调用 log 它持有互斥锁。它发现measure_executors 是1。然后线程B,调用measure 并且不持有锁递减measure_executors,然后调用notify_all。然后线程 A 再次运行,调用wait 等待已经发生的事情。这段代码是一场灾难。互斥锁不保护任何东西,使wait 函数容易陷入死锁。
  • 当然,通过减少锁定保护来修复。还可以使用 condition.wait_for(lock, std::chrono::milliseconds(milliseconds)) 代替 condition.wait(lock) 来完全防止竞争条件
  • 不幸的是,我遇到了死锁——两种方法都停在“condition.wait(lock);”处。假设:“测量”在 condition.wait() 处停止,因为 log_executors == 1(这会解锁互斥锁)-> 在等待时,再次调用“log”(log_executors = 2)并且在 condition.wait() 处也停止 ->第一个“log”完成(log_executors = 1)->这会唤醒“measure”,但它仍然等待,因为 log_executors 尚未 0 => 死锁,因为 log_executors 永远不会变为 0(“log”也在 condition.wait() 处等待测量完成)解决方案:当我在while循环之后移动增量时,它当前工作
【解决方案3】:

您可以拥有一个主锁来授予对信号量变量的访问权限:

void log(std::string message)
{
    acquire(LOG);
    someLogFunctionality();
    release(LOG);
}

void measure()
{        
    acquire(MEASURE);
    someMeasureFunctionality();
    release(MEASURE);
}

void acquire(int what) {
    for (;;) {
        mtx.lock();
        if (owner == NONE) {
            owner = what;
        }
        if (owner == what) {
            // A LOG was asked while LOG is running
            users[owner]++;
            mtx.unlock();
            return;
        }
        mtx.unlock();
        // Some sleep would be good
        usleep(5000);
    }
}

void release(int what) {
    mtx.lock();
    if (owner != what) {
        // This is an error. How could this happen?
    }
    if (users[what] <= 0) {
        // This is an error. How could this happen?
    }
    users[what]--;
    if (0 == users[what]) {
        owner = NONE;
    }
    mtx.unlock();
}

在这种情况下,例如:

owner is NONE
LOG1 acquires LOG. It can do so because owner is NONE
MEASURE1 acquires LOG. It starts spinning in place because owner != MEASURE
MEASURE2 acquires LOG. It starts spinning in place because owner != MEASURE
LOG2 acquires LOG. It can do so because owner is LOG, users[LOG]=2
LOG2 releases LOG. users[LOG]=1
LOG1 releases LOG. users[LOG]=0, so owner becomes NONE
MEASURE2 by pure chance acquires mtx before MEASURE1, finds owner=NONE and goes
MEASURE1 finds owner=MEASURE and sets users[MEASURE]=2

在上面,请注意对 measure() 的第二次调用实际上执行得更早一些。这应该没问题。但是,如果您想保持调用“序列化”,即使它们是并行发生的,您将需要为每个所有者提供一个堆栈和更复杂的代码。

【讨论】:

  • 睡眠/自旋方法真的很丑。
猜你喜欢
  • 2015-04-16
  • 2012-08-03
  • 1970-01-01
  • 2014-10-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-08-11
  • 1970-01-01
相关资源
最近更新 更多