【问题标题】:Stopping C++ 11 std::threads waiting on a std::condition_variable停止 C++ 11 std::threads 在 std::condition_variable 上等待
【发布时间】:2014-03-12 12:02:20
【问题描述】:

我正在尝试了解新 C++ 11 标准中的基本多线程机制。我能想到的最基本的例子如下:

  • 生产者和消费者在不同的线程中实现
  • 生产者将一定数量的项目放入队列中
  • 消费者从队列中取出物品(如果有的话)

这个例子也在许多关于多线程的教科书中使用,关于通信过程的一切都很好。但是,在停止消费者线程时我遇到了问题。

我希望消费者一直运行,直到它得到一个明确的停止信号(在大多数情况下,这意味着我等待生产者完成,以便我可以在程序结束之前停止消费者)。不幸的是,C++ 11 线程缺乏中断机制(例如,我从 Java 中的多线程中知道)。因此,我必须使用isRunning 之类的标志来表示我希望线程停止。

现在的主要问题是:在我停止生产者线程后,队列为空,消费者正在等待condition_variable 以在队列再次填充时获取信号。所以我需要在退出之前通过在变量上调用notify_all()来唤醒线程。

我找到了一个可行的解决方案,但它似乎有点混乱。 下面列出了示例代码(我很抱歉,但不知何故,对于“最小”的最小示例,我无法进一步减小代码大小):

队列类:

class Queue{
public:
    Queue() : m_isProgramStopped{ false } { }

    void push(int i){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_q.push(i);
        m_cond.notify_one();
    }

    int pop(){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });

        if (m_isProgramStopped){
            throw std::exception("Program stopped!");
        }

        int x = m_q.front();
        m_q.pop();

        std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
        return x;
    }

    void stop(){
        m_isProgramStopped = true;
        m_cond.notify_all();
    }

private:
    std::queue<int> m_q;
    std::mutex m_mtx;
    std::condition_variable m_cond;
    bool m_isProgramStopped;
};

制作人:

class Producer{
public:
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }

    void produce(){
        for (int i = 0; i < 5; i++){
            m_q.push(m_counter++);
            std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
        }
    }

    void execute(){
        m_t = std::thread(&Producer::produce, this);
    }

    void join(){
        m_t.join();
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_counter;
};

消费者:

class Consumer{
public:
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
    { }

    ~Consumer(){
        std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
    }

    void consume(){
        while (m_isRunning){
            try{
                m_q.pop();
                m_takeCounter++;
            }
            catch (std::exception e){
                std::cout << "Program was stopped while waiting." << std::endl;
            }
        }
    }

    void execute(){
        m_t = std::thread(&Consumer::consume, this);
    }

    void join(){
        m_t.join();
    }

    void stop(){
        m_isRunning = false;
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_takeCounter;
    bool m_isRunning;
};

最后是main()

int main(void){
    Queue q;

    Consumer cons{ q };
    Producer prod{ q };

    cons.execute();
    prod.execute();

    prod.join();

    cons.stop();
    q.stop();

    cons.join();

    std::cout << "END" << std::endl;

    return EXIT_SUCCESS;
}

这是结束等待条件变量的线程的正确方法还是有更好的方法?目前,队列需要知道程序是否已经停止(我认为这会破坏组件的松散耦合),我需要在队列上显式调用stop(),这似乎不正确。

此外,如果队列为空,则应该用作信号的条件变量现在代表另一个条件 - 如果程序已经结束。如果我没记错的话,每次线程在条件变量上等待某个事件发生时,它还必须检查线程是否必须在继续执行之前停止(这似乎也是错误的)。

我有这些问题是因为我的整个设计有问题还是我缺少一些可以用来以干净方式退出线程的机制?

【问题讨论】:

  • 这几乎就是我们在代码中所做的。设置一个“停止”变量并​​通知条件变量,并让它测试该标志作为它所做的第一件事。我们似乎没有找到一个简单、更优雅的通用解决方案。
  • 您可以从队列的析构函数中调用stop()。查看类似的解决方案stackoverflow.com/a/9711916/412080
  • 你也可以使用哨兵 (c2.com/cgi/wiki?SentinelPattern)
  • Queue::m_isProgramStoppedConsumer::m_isRunning 都应该是 atomic&lt;bool&gt;atomic_flag,因为 main() 在不受互斥锁保护的情况下同时写入它们。
  • @Praetorian Making Queue::m_isProgramStopped atomic 将使其免于数据竞争,但仍允许stop 中的notifypop 中的wait 之间的竞争。在pop 中的线程检查m_isProgramStopped 之后,stopnotify 中的线程可能在条件变量上休眠,导致通知丢失并且主线程永远等待永远不会退出的消费者。更好的解决方案是使用m_mtx 保护对m_isProgramStopped 的所有访问。

标签: c++ multithreading c++11 interrupt condition-variable


【解决方案1】:

不,你的设计没有问题,这是解决这类问题的正常方法。

将多个条件(例如队列中的任何内容或程序停止)附加到条件变量是完全有效的。关键是当wait 返回时检查条件中的位。

与其在Queue 中使用标志来指示程序正在停止,不如将标志视为“我可以接受”。这是一个更好的整体范例,在多线程环境中效果更好。

另外,如果有人调用它并且 stop 已被调用,您可以用 bool try_pop(int &amp;value) 替换该方法,而不是让 pop 抛出异常,如果返回值则返回 true,否则 false .这样,调用者可以检查失败以查看队列是否已停止(添加bool is_stopped() const 方法)。虽然异常处理在这里有效,但它有点笨拙,在多线程程序中并不是一个真正的例外情况。

【讨论】:

  • 但是如果线程运行的任务处理时间很长,那么需要等待一段时间,stop()调用才会生效。那么在这方面,这是最好的设计吗?
  • 您能否详细说明“队列中没有标志表示程序正在停止,您应该将标志视为“我可以接受””是什么意思?
【解决方案2】:

wait 可以超时调用。控制权返回给线程并且可以检查stop。根据该值,它可以wait 处理更多要消耗或完成执行的项目。使用 c++ 进行多线程的一个很好的介绍是 C++11 Concurrency

【讨论】:

猜你喜欢
  • 2020-11-22
  • 1970-01-01
  • 2012-10-17
  • 1970-01-01
  • 2020-03-28
  • 2020-11-01
  • 1970-01-01
  • 2015-08-26
相关资源
最近更新 更多