【问题标题】:C++20 thread possibly waiting on std::atomic foreverC++20 线程可能永远在 std::atomic 上等待
【发布时间】:2021-04-09 21:17:22
【问题描述】:

考虑以下示例代码,其中线程 A 将函数推送到队列中,而线程 B 在从队列中弹出时执行这些函数:

std::atomic<uint32_t> itemCount;

//Executed by thread A
void run(std::function<void()> function) {

    if (queue.push(std::move(function))) {
        itemCount.fetch_add(1, std::memory_order_acq_rel);
        itemCount.notify_one();
    }

}


//Executed by thread B
void threadMain(){

    std::function<void()> function;

    while(true){

        if (queue.pop(function)) {

            itemCount.fetch_sub(1, std::memory_order_acq_rel);
            function();

        }else{
            itemCount.wait(0, std::memory_order_acquire);
        }

    }

}

其中queue 是一个并发队列,它有一个push 和一个pop 函数,每个函数都返回一个bool,指示给定操作是否成功。因此,push 如果已满则返回 false,如果为空则pop 返回false

现在我想知道代码是否在所有情况下都是线程安全的。 假设线程 B 的 pop 失败,即将调用 std::atomic&lt;T&gt;::wait。同时,线程 A 推送一个新元素,而线程 B 检查初始等待条件。由于itemCount 尚未更改,因此失败。

紧接着,线程 A 增加计数器并尝试通知一个正在等待的线程(尽管线程 B 还没有在内部等待)。线程 B 最终等待原子,导致线程由于丢失信号而永远不会再次唤醒尽管队列中有一个元素。只有当一个新元素被推入队列时才会停止,通知 B 继续执行。

我无法手动重现这种情况,因为时间几乎不可能正确。

这是一个严重的问题还是不可能发生?为了解决这种罕见的情况,确实存在哪些(最好是原子的)替代方案?

编辑: 顺便提一下,队列没有阻塞,只使用原子操作。


我问的原因是我不明白如何实现原子wait 操作。虽然标准说整个操作是原子的(由加载 + 谓词检查 + 等待组成),但在我使用的实现中 std::atomic&lt;T&gt;::wait 大致实现如下:

void wait(const _TVal _Expected, const memory_order _Order = memory_order_seq_cst) const noexcept {
    _Atomic_wait_direct(this, _Atomic_reinterpret_as<long>(_Expected), _Order);
}

_Atomic_wait_direct 定义为

template <class _Ty, class _Value_type>
void _Atomic_wait_direct(
    const _Atomic_storage<_Ty>* const _This, _Value_type _Expected_bytes, const memory_order _Order) noexcept {
    const auto _Storage_ptr = _STD addressof(_This->_Storage);
    for (;;) {
        const _Value_type _Observed_bytes = _Atomic_reinterpret_as<_Value_type>(_This->load(_Order));
        if (_Expected_bytes != _Observed_bytes) {
            return;
        }

        __std_atomic_wait_direct(_Storage_ptr, &_Expected_bytes, sizeof(_Value_type), _Atomic_wait_no_timeout);
    }
}

我们可以很清楚的看到,有一个原子负载以指定的内存顺序来检查原子本身的状态。但是,我看不出如何将 整个操作 视为原子操作,因为在调用 __std_atomic_wait_direct 之前有一个比较。

对于条件变量,谓词本身是由互斥锁保护的,但是原子本身是如何保护的呢?

【问题讨论】:

  • memory_order_acquirememory_order_acq_rel 是这里可能存在的问题。没有它们,您描述的问题就不会发生。有了他们,我不能告诉你。如果没有这些标志您无法证明您的代码是正确的,不要使用这些标志
  • 我已经怀疑这是一个问题,但为什么 memory_order_seq_cst 在这种情况下是安全的?
  • 如果有像我一样的人想知道 C++ atomic 有像 apis 这样的条件变量,那么答案就是 C++20 中的这个宇宙 Difference between std::atomic and std::condition_variable wait, notify_* methods
  • 在我看来,您正在尝试使用 std::atomic&lt;uint32_t&gt; 作为计数信号量。我会改用std::counting_semaphore&gt;。我也将它构建到队列本身,所以pushpop 处理它,而外部世界不会。顺便说一句,我还将超时传递给pop,因此它会在失败前自动等待指定的时间。
  • wait 是原子操作,fetch_add 也是如此。它们不会交错,而是以某种全局顺序执行。如果wait 恰好先执行,那么它后面会跟着fetch_addnotify_all,这会唤醒它。如果fetch_add 恰好先执行,那么在wait 执行时,itemCount.load() != 0wait 将立即返回。你认为wait 是分步执行的——检查原子的值,然后进入等待——但这与认为fetch_add 先获取然后添加,并担心另一个操作会挤在两者之间是错误的。跨度>

标签: c++ multithreading wait atomic c++20


【解决方案1】:

标准是这样说的:

[intro.races]/4 对特定原子对象M 的所有修改都以某个特定的总顺序发生,称为M 的修改顺序。

[atomics.wait]/4 对原子对象M 的原子等待操作的调用有资格通过调用原子通知来解除阻塞如果XYM 存在副作用,则对M 进行操作,这样:
(4.1)——原子等待操作在观察X的结果后已经阻塞,
(4.2)——XM的修改顺序中在Y之前,并且
(4.3) — Y 发生在调用原子通知操作之前。

您假设以下情况:

  1. itemCount 的当前值为零,来自原始初始化或先前的fetch_sub
  2. wait 加载 itemCount 并观察到 ​​0 的值。
  3. 另一个线程调用fetch_addnotify_one
  4. wait 被阻塞,因为它认为现在过时的值 0。

在这种情况下,MitemCountX 是旧的 fetch_sub,它使值变为 0(我们假设它发生在很久以前并且对所有线程都正确可见)和 Y是将值更改为 1 的 fetch_add

标准规定wait 调用(跨越第 2 步和第 4 步)实际上可以通过第 3 步的notify_one 调用解除阻塞。确实:

(4.1) - 在观察itemCount == 0 后,wait 已经阻塞(如果没有,则问题没有出现)。
(4.2) - fetch_subitemCount 的修改顺序中位于 fetch_add 之前(假设 fetch_sub 发生在很久以前)。
(4.3) - fetch_add 发生在之前(实际上是排序在之前)notify_one;它们被同一个线程一个接一个地调用。

因此,符合要求的实现要么一开始就不允许wait 阻塞,要么让notify_one 唤醒它;它不允许错过通知。


本次讨论中唯一涉及内存顺序的地方是 (4.1),“原子等待操作在 观察X 的结果后阻塞”。也许fetch_add 实际上发生在wait 之前(按挂钟),但wait 看不到,所以无论如何它都会阻塞。但这与结果无关——要么wait 观察到fetch_add 的结果,并且根本不阻塞;或者它会观察旧的fetch_sub 和阻塞的结果,但是需要notfiy_one 来唤醒它。

【讨论】:

  • 所以,我想关键是 eligible to be unblocked by 允许相当弱的排序。加载 X 的结果(其中 X 在修改顺序中位于 Y 之前)并不能证明您的加载发生在 Y 之前,但发生在之前不是 eligible to be unlocked by 所要求的。
【解决方案2】:

首先要指出可能导致混淆的原因:函数__std_atomic_wait_direct完全独立,以原子方式进行比较和阻塞。它归结为对WaitOnAddressuses a condition variable 的调用,在保持关联互斥体的同时完成比较。

if (_Expected_bytes != _Observed_bytes) 只是一种优化。如果变量已经与预期值不同,我们可以立即返回,而无需对底层原子函数进行相对昂贵的调用。它不需要与块是原子的,并且可以在不影响语义的情况下完全删除。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-28
    • 2017-08-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多