【问题标题】:Is it mandatory to lock mutex before signaling on condition variable?在条件变量上发出信号之前是否必须锁定互斥锁?
【发布时间】:2021-10-21 07:25:47
【问题描述】:

我们已经实现了TaskRunner,它的函数将被不同的线程调用来启动、停止和发布任务。 TaskRunner 将在内部创建一个线程,如果队列不为空,它将从队列中弹出任务并执行它。 Start() 将检查线程是否正在运行。如果没有创建一个新线程。 Stop() 将加入线程。代码如下。

bool TaskRunnerImpl::PostTask(Task* task) {
  tasks_queue_.push_back(task);
  return true;
}

void TaskRunnerImpl::Start() {
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  if(is_running_) {
    return;
  }
  is_running_ = true;

  runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}

void TaskRunnerImpl::Run() {
  while(is_running_) {
    if(tasks_queue_.empty()) {
      continue;
    }
    Task* task_to_run = tasks_queue_.front();
    task_to_run->Run();
    tasks_queue_.pop_front();

    delete task_to_run;
  }
}

void TaskRunnerImpl::Stop() {
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  is_running_ = false;
  if(runner_thread_.joinable()) {
    runner_thread_.join();
  }
}

我们现在要使用条件变量,否则线程将不断检查任务队列是否为空。我们实现如下。

  • 线程函数 (Run()) 将等待条件变量。
  • PostTask() 会在有人发布任务时发出信号。
  • Stop() 将在有人调用停止时发出信号。

代码如下。

bool TaskRunnerImpl::PostTask(Task* task) {
  std::lock_guard<std::mutex> taskGuard(m_task_mutex);
  tasks_queue_.push_back(task);
  m_task_cond_var.notify_one();
  return true;
}

void TaskRunnerImpl::Start() {
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  if(is_running_) {
    return;
  }
  is_running_ = true;

  runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}

void TaskRunnerImpl::Run() {
    while(is_running_) {
        Task* task_to_run = nullptr;

        {
            std::unique_lock<std::mutex> mlock(m_task_mutex);
            m_task_cond_var.wait(mlock, [this]() {
                return !(is_running_ && tasks_queue_.empty());
            });

            if(!is_running_) {
                return;
            }

            if(!tasks_queue_.empty()) {
                task_to_run = tasks_queue_.front();
                task_to_run->Run();
                tasks_queue_.pop_front();
            }
        }

        if(task_to_run)
            delete task_to_run;
    }
}

void TaskRunnerImpl::Stop() {
    std::lock_guard<std::mutex> lock(is_running_mutex_);
    is_running_ = false;
    m_task_cond_var.notify_one();
    if(runner_thread_.joinable()) {
        runner_thread_.join();
    }
}

我有几个问题如下。谁能帮我理解这些。

  1. 条件变量 m_task_cond_var 与互斥锁 m_task_mutex 链接。但是 Stop() 已经将互斥锁 is_running_mutex 锁定为 'is_running_'。我需要在发信号之前锁定 m_task_mutex 吗?在这里,我不相信为什么要锁定 m_task_mutex,因为我们没有保护任何与任务队列相关的东西。

  2. 在线程函数(Run())中,我们正在读取 is_running_ 而不锁定 is_running_mutex。这是正确的吗?

【问题讨论】:

  • 1:您需要锁定互斥锁以更新队列,但不必保持锁定信号:检查示例:en.cppreference.com/w/cpp/thread/condition_variable/wait 2:is_running 的使用不受保护,我会假设这段代码可以在 x64 架构中工作,但在其他地方可能会出现问题。为此,我建议检查 jthread 和 stop_token,这将帮助您减少复杂的循环。
  • 不是直接你的问题,而是在执行task_to_run-&gt;Run() 时,m_task_mutex 仍然被锁定,这会阻止发布其他任务。真的是你想要的吗?
  • @prog-fh :感谢您指出这一点。我将更改我的代码。
  • 不清楚为什么你需要额外的互斥锁来运行标志,一个互斥锁就可以了,让事情变得更简单。
  • 您的Run 函数在访问is_running_ 时没有持有保护它的互斥锁。那是 UB,使代码的行为无法分析。

标签: c++ multithreading c++11


【解决方案1】:

在发信号 [In Stop] 之前是否需要锁定 m_task_mutex

condition_variable::wait 方法中测试的谓词依赖于信号线程中发生的某些事情时(几乎总是如此),那么您应该在发出信号之前获取互斥锁。如果您持有m_task_mutex,请考虑以下可能性:

  1. 观察者线程 (TaskRunnerImpl::Run) 被唤醒(通过虚假唤醒或从其他地方收到通知)并获得互斥体。
  2. 观察者线程检查它的谓词,发现它是false
  3. 信号器线程 (TaskRunnerImpl::Stop) 将谓词更改为返回 true(通过设置 is_running_ = false;)。
  4. 信号器线程向条件变量发出信号。
  5. 观察者线程等待发出信号(错误)
    • 信号来来去去
    • 谓词是false,所以观察者开始等待,可能无限期地等待。

如果您在发出信号时持有互斥锁,可能发生的最糟糕的情况是,被阻塞的线程 (TaskRunnerImpl::Run) 会在尝试获取互斥锁时被唤醒并立即被阻塞。这可能会对性能产生一些影响。


在 [TaskRunnerImpl::Run] 中,我们正在读取 is_running_ 而不锁定 is_running_mutex。这是正确的吗?

一般不会。即使它是bool 类型。因为布尔值通常实现为单个字节,所以可能有一个线程在您读取时正在写入该字节,从而导致部分读取。然而,在实践中,它是安全的。也就是说,您应该在阅读之前获取互斥锁(然后立即释放)。

事实上,最好使用std::atomic&lt;bool&gt; 而不是bool + mutex 的组合(或者std::atomic_flag,如果你想变得花哨的话)会有同样的效果,但更容易工作与。

【讨论】:

  • “在发出信号之前,您不需要获得条件变量正在等待的同一个互斥体。”实际上你做到了。
  • @Slava:也许我不清楚?信号器不需要获取互斥锁,但是在等待条件变量时,应该在调用wait之前保持互斥锁
  • 不,你说的很清楚。如果您在未锁定或未锁定互斥条件变量的情况下更改监视变量和信号,您将面临竞争条件 - 您可能会错过通知。
  • "除非编译器执行重新排序以在赋值之前放置信号(如果我们使用 atomic 则不会)" 不,它可能在没有重新排序的情况下发生。观察线程由于某种原因唤醒,检查这个标志(它还没有发生)并且还没有在条件变量上进入睡眠状态。现在信号线程更改布尔和信号。观察线程会错过它,因为它已经检查过标志但它也不会收到信号,因为它还没有等待条件变量。为了避免这种情况,专门创建了休眠和解锁互斥锁的原子性。
  • 是的,基本上系统保证条件变量上的信号发生之后wait()解锁的互斥锁将被传递。为了保证您发送信号之后,您必须在互斥锁锁定期间或解锁之后更改相同互斥锁和信号下的条件,而不是之前。
【解决方案2】:

在发信号 [In Stop] 之前是否需要锁定 m_task_mutex

是的,你知道。您必须在同一互斥锁下更改条件,并在互斥锁锁定或更改后解锁后发送信号。如果您不使用相同的互斥体,或者在该互斥体被锁定之前发送信号,则会创建 std::condition_variable 来解决的竞争条件。

逻辑是这样的:

监视线程锁定互斥体并检查监视条件。如果它没有发生,它会进入睡眠状态并原子地解锁互斥锁。所以信号线程锁定互斥体,改变条件和信号。如果信号线程在观察一个锁定互斥体之前这样做,那么观察一个会看到情况发生并且不会进入睡眠状态。如果之前被锁住了,它会在信号线程发出信号时进入休眠状态并唤醒。

注意:您可以在互斥锁解锁之前或之后向条件变量发出信号,这两种情况都是正确的,但可能会影响性能。但是在锁定互斥体之前发出信号是不正确的。

条件变量 m_task_cond_var 与互斥锁 m_task_mutex 链接。但是 Stop() 已经将互斥锁 is_running_mutex 锁定为 'is_running_'。我需要在发信号之前锁定 m_task_mutex 吗?在这里,我不相信为什么要锁定 m_task_mutex,因为我们没有保护任何与任务队列相关的东西。

您使代码过于复杂,使事情变得更糟。在这种情况下,您应该只使用一个互斥锁,它会按预期工作。

在线程函数(Run())中,我们正在读取 is_running_ 而不锁定 is_running_mutex。这是正确的吗?

在 x86 硬件上它可能“工作”,但从语言的角度来看,这是 UB。

【讨论】:

    猜你喜欢
    • 2013-12-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-07
    相关资源
    最近更新 更多