【问题标题】:Using std::condition_variable with atomic<bool>将 std::condition_variable 与 atomic<bool> 一起使用
【发布时间】:2016-03-21 08:40:23
【问题描述】:

关于 SO 处理 atomic 和其他处理 std::condition_variable 有几个问题。但是我的问题是我在下面的使用是否正确?

三个线程,一个 ctrl 线程在取消暂停其他两个线程之前进行准备工作。 ctrl 线程还能够在工作线程(发送方/接收方)处于紧密的发送/接收循环时暂停它们。 使用原子的想法是在未设置暂停布尔值的情况下使紧密循环更快。

class SomeClass
{

public:
    //...                                                                                                                                                                                                                                                                                                                                                                                   
    // Disregard that data is public...                                                                                                                                                                                                                                                                                                                                                     

    std::condition_variable cv; // UDP threads will wait on this cv until allowed                                                                                                                                                                                                                                                                                                           
                                // to run by ctrl thread.                                                                                                                                                                                                                                                                                                                                   
    std::mutex cv_m;
    std::atomic<bool> pause_test_threads;
};

void do_pause_test_threads(SomeClass *someclass)
{
    if (!someclass->pause_test_threads)
    {
        // Even though we use an atomic, mutex must be held during                                                                                                                                                                                                                                                                                                                          
        // modification. See documentation of condition variable                                                                                                                                                                                                                                                                                                                            
        // notify_all/wait. Mutex does not need to be held for the actual                                                                                                                                                                                                                                                                                                                   
        // notify call.                                                                                                                                                                                                                                                                                                                                                                     
        std::lock_guard<std::mutex> lk(someclass->cv_m);
        someclass->pause_test_threads = true;
    }
}

void unpause_test_threads(SomeClass *someclass)
{
    if (someclass->pause_test_threads)
    {
        {
            // Even though we use an atomic, mutex must be held during                                                                                                                                                                                                                                                                                                                      
            // modification. See documentation of condition variable                                                                                                                                                                                                                                                                                                                        
            // notify_all/wait. Mutex does not need to be held for the actual                                                                                                                                                                                                                                                                                                               
            // notify call.                                                                                                                                                                                                                                                                                                                                                                 
            std::lock_guard<std::mutex> lk(someclass->cv_m);
            someclass->pause_test_threads = false;
        }
        someclass->cv.notify_all(); // Allow send/receive threads to run.                                                                                                                                                                                                                                                                                                                   
    }
}

void wait_to_start(SomeClass *someclass)
{
    std::unique_lock<std::mutex> lk(someclass->cv_m); // RAII, no need for unlock.                                                                                                                                                                                                                                                                                                          
    auto not_paused = [someclass](){return someclass->pause_test_threads == false;};
    someclass->cv.wait(lk, not_paused);
}

void ctrl_thread(SomeClass *someclass)
{
    // Do startup work                                                                                                                                                                                                                                                                                                                                                                      
    // ...                                                                                                                                                                                                                                                                                                                                                                                  
    unpause_test_threads(someclass);

    for (;;)
    {
        // ... check for end-program etc, if so, break;                                                                                                                                                                                                                                                                                                                                     
        if (lost ctrl connection to other endpoint)
        {
            pause_test_threads();
        }
        else
        {
            unpause_test_threads();
        }
        sleep(SLEEP_INTERVAL);

    }

    unpause_test_threads(someclass);
}

void sender_thread(SomeClass *someclass)
{
    wait_to_start(someclass);
    ...
    for (;;)
    {
        // ... check for end-program etc, if so, break;                                                                                                                                                                                                                                                                                                                                     
        if (someclass->pause_test_threads) wait_to_start(someclass);
        ...
    }
}

void receiver_thread(SomeClass *someclass)
{
    wait_to_start(someclass);
    ...
    for (;;)
    {
        // ... check for end-program etc, if so, break;                                                                                                                                                                                                                                                                                                                                     
        if (someclass->pause_test_threads) wait_to_start(someclass);
        ...
    }

【问题讨论】:

    标签: c++ multithreading c++11


    【解决方案1】:

    我查看了你操作条件变量和原子的代码,似乎是正确的,不会引起问题。

    为什么你应该保护对共享变量的写入,即使它是原子的:

    如果在谓词中检查共享变量和等待条件之间发生写入共享变量可能会出现问题。考虑以下:

    1. 等待线程虚假唤醒,获取互斥体,检查谓词并将其评估为false,因此它必须再次等待 cv。

    2. 控制线程将共享变量设置为true

    3. 控制线程发送通知,任何人都没有收到,因为没有线程在等待条件变量。

    4. 等待线程等待条件变量。由于通知已经发送,它会等到下一次虚假唤醒,或者下次控制线程发送通知时。可能会无限期地等待。

    从共享原子变量中读取而不加锁通常是安全的,除非它引入了TOCTOU problems

    在您的情况下,您正在读取共享变量以避免不必要的锁定,然后在锁定后再次检查它(在有条件的wait 调用中)。这是一个有效的优化,称为双重检查锁定,我在这里看不到任何潜在问题。

    您可能想检查atomic&lt;bool&gt; 是否无锁。否则,您将拥有更多没有它的锁。

    【讨论】:

    • 似乎所有 x86_64 机器都有无锁原子 ,所有其他 X86:es 都有布尔值的原子比较交换。 (我从 /usr/include/boost/atomic/detail/gcc-x86.hpp 得出这个结论)。
    • 基于您示例中的问题,可以说您可以安全地写入共享变量而不锁定互斥锁 if 这样的写入永远不会导致谓词等待线程评估为true(想象这可以通过代码本身的逻辑来保证),并且仅在“唤醒”写入完成时锁定?
    【解决方案2】:

    一般来说,您希望独立于变量与条件变量一起工作的方式来对待变量是原子的这一事实。

    如果与条件变量交互的所有代码都遵循在查询/修改之前锁定互斥锁的通常模式,并且与条件变量交互的代码不依赖于不与条件变量交互的代码,它将继续即使它包装了一个原子互斥体也是正确的。

    通过快速阅读您的伪代码,这似乎是正确的。但是,对于多线程代码,伪代码通常不能很好地替代真实代码。

    当原子读取表明您可能想要也可能不想要优化时,仅等待条件变量(并锁定互斥锁)的“优化”。您需要分析吞吐量。

    【讨论】:

      【解决方案3】:

      原子数据不需要再次同步,它是无锁算法和数据结构的基础。

      void do_pause_test_threads(SomeClass *someclass)
      {
          if (!someclass->pause_test_threads)
          {
              /// your pause_test_threads might be changed here by other thread
              /// so you have to acquire mutex before checking and changing
              /// or use atomic methods - compare_exchange_weak/strong,
              /// but not all together                                                                                                                                                                                                                                                                                                                                                             
              std::lock_guard<std::mutex> lk(someclass->cv_m);
              someclass->pause_test_threads = true;
          }
      }
      

      【讨论】:

      • 一般来说,是的,但在这种情况下,不是。检查文档以了解 C++11 条件变量的用法,请参阅 en.cppreference.com/w/cpp/thread/condition_variable
      • 这不是同时更改的问题(分配给 atomic 不会导致数据竞争)。需要互斥锁是因为如果虚假唤醒和线程执行交错恰到好处,等待线程可能会错过通知并进入无限等待。
      • @Revolver_Ocelot:好的,感谢您的评论。也许需要详细说明并可能将其变成一个完整的答案?在我的代码中,我在互斥锁下进行了所有更改,但在紧密循环中读取了没有互斥锁的原子布尔值。
      • @ErikAlapää 在你的情况下,你的阅读在某种意义上是不依赖的。当您尝试在“条件变量”通信系统期间依靠原子性时,会出现没有互斥锁的原子变量和条件代码的问题:如果这样做,很容易得到一个可能的窗口您的侦听线程正在等待已发出信号的变量。
      猜你喜欢
      • 2014-01-25
      • 2021-07-06
      • 1970-01-01
      • 1970-01-01
      • 2020-03-28
      • 1970-01-01
      • 2012-03-22
      • 2014-02-28
      • 1970-01-01
      相关资源
      最近更新 更多