【发布时间】:2021-10-21 07:25:47
【问题描述】:
我们已经实现了TaskRunner,它的函数将被不同的线程调用来启动、停止和发布任务。 TaskRunner 将在内部创建一个线程,如果队列不为空,它将从队列中弹出任务并执行它。 Start() 将检查线程是否正在运行。如果没有创建一个新线程。 Stop() 将加入线程。代码如下。
bool TaskRunnerImpl::PostTask(Task* task) {
tasks_queue_.push_back(task);
return true;
}
void TaskRunnerImpl::Start() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
if(is_running_) {
return;
}
is_running_ = true;
runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}
void TaskRunnerImpl::Run() {
while(is_running_) {
if(tasks_queue_.empty()) {
continue;
}
Task* task_to_run = tasks_queue_.front();
task_to_run->Run();
tasks_queue_.pop_front();
delete task_to_run;
}
}
void TaskRunnerImpl::Stop() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
is_running_ = false;
if(runner_thread_.joinable()) {
runner_thread_.join();
}
}
我们现在要使用条件变量,否则线程将不断检查任务队列是否为空。我们实现如下。
- 线程函数 (Run()) 将等待条件变量。
- PostTask() 会在有人发布任务时发出信号。
- Stop() 将在有人调用停止时发出信号。
代码如下。
bool TaskRunnerImpl::PostTask(Task* task) {
std::lock_guard<std::mutex> taskGuard(m_task_mutex);
tasks_queue_.push_back(task);
m_task_cond_var.notify_one();
return true;
}
void TaskRunnerImpl::Start() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
if(is_running_) {
return;
}
is_running_ = true;
runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}
void TaskRunnerImpl::Run() {
while(is_running_) {
Task* task_to_run = nullptr;
{
std::unique_lock<std::mutex> mlock(m_task_mutex);
m_task_cond_var.wait(mlock, [this]() {
return !(is_running_ && tasks_queue_.empty());
});
if(!is_running_) {
return;
}
if(!tasks_queue_.empty()) {
task_to_run = tasks_queue_.front();
task_to_run->Run();
tasks_queue_.pop_front();
}
}
if(task_to_run)
delete task_to_run;
}
}
void TaskRunnerImpl::Stop() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
is_running_ = false;
m_task_cond_var.notify_one();
if(runner_thread_.joinable()) {
runner_thread_.join();
}
}
我有几个问题如下。谁能帮我理解这些。
-
条件变量 m_task_cond_var 与互斥锁 m_task_mutex 链接。但是 Stop() 已经将互斥锁 is_running_mutex 锁定为 'is_running_'。我需要在发信号之前锁定 m_task_mutex 吗?在这里,我不相信为什么要锁定 m_task_mutex,因为我们没有保护任何与任务队列相关的东西。
-
在线程函数(Run())中,我们正在读取 is_running_ 而不锁定 is_running_mutex。这是正确的吗?
【问题讨论】:
-
1:您需要锁定互斥锁以更新队列,但不必保持锁定信号:检查示例:en.cppreference.com/w/cpp/thread/condition_variable/wait 2:is_running 的使用不受保护,我会假设这段代码可以在 x64 架构中工作,但在其他地方可能会出现问题。为此,我建议检查 jthread 和 stop_token,这将帮助您减少复杂的循环。
-
不是直接你的问题,而是在执行
task_to_run->Run()时,m_task_mutex仍然被锁定,这会阻止发布其他任务。真的是你想要的吗? -
@prog-fh :感谢您指出这一点。我将更改我的代码。
-
不清楚为什么你需要额外的互斥锁来运行标志,一个互斥锁就可以了,让事情变得更简单。
-
您的
Run函数在访问is_running_时没有持有保护它的互斥锁。那是 UB,使代码的行为无法分析。
标签: c++ multithreading c++11