【发布时间】:2021-04-09 21:17:22
【问题描述】:
考虑以下示例代码,其中线程 A 将函数推送到队列中,而线程 B 在从队列中弹出时执行这些函数:
std::atomic<uint32_t> itemCount;
//Executed by thread A
void run(std::function<void()> function) {
if (queue.push(std::move(function))) {
itemCount.fetch_add(1, std::memory_order_acq_rel);
itemCount.notify_one();
}
}
//Executed by thread B
void threadMain(){
std::function<void()> function;
while(true){
if (queue.pop(function)) {
itemCount.fetch_sub(1, std::memory_order_acq_rel);
function();
}else{
itemCount.wait(0, std::memory_order_acquire);
}
}
}
其中queue 是一个并发队列,它有一个push 和一个pop 函数,每个函数都返回一个bool,指示给定操作是否成功。因此,push 如果已满则返回 false,如果为空则pop 返回false。
现在我想知道代码是否在所有情况下都是线程安全的。
假设线程 B 的 pop 失败,即将调用 std::atomic<T>::wait。同时,线程 A 推送一个新元素,而线程 B 检查初始等待条件。由于itemCount 尚未更改,因此失败。
紧接着,线程 A 增加计数器并尝试通知一个正在等待的线程(尽管线程 B 还没有在内部等待)。线程 B 最终等待原子,导致线程由于丢失信号而永远不会再次唤醒尽管队列中有一个元素。只有当一个新元素被推入队列时才会停止,通知 B 继续执行。
我无法手动重现这种情况,因为时间几乎不可能正确。
这是一个严重的问题还是不可能发生?为了解决这种罕见的情况,确实存在哪些(最好是原子的)替代方案?
编辑: 顺便提一下,队列没有阻塞,只使用原子操作。
我问的原因是我不明白如何实现原子wait 操作。虽然标准说整个操作是原子的(由加载 + 谓词检查 + 等待组成),但在我使用的实现中 std::atomic<T>::wait 大致实现如下:
void wait(const _TVal _Expected, const memory_order _Order = memory_order_seq_cst) const noexcept {
_Atomic_wait_direct(this, _Atomic_reinterpret_as<long>(_Expected), _Order);
}
_Atomic_wait_direct 定义为
template <class _Ty, class _Value_type>
void _Atomic_wait_direct(
const _Atomic_storage<_Ty>* const _This, _Value_type _Expected_bytes, const memory_order _Order) noexcept {
const auto _Storage_ptr = _STD addressof(_This->_Storage);
for (;;) {
const _Value_type _Observed_bytes = _Atomic_reinterpret_as<_Value_type>(_This->load(_Order));
if (_Expected_bytes != _Observed_bytes) {
return;
}
__std_atomic_wait_direct(_Storage_ptr, &_Expected_bytes, sizeof(_Value_type), _Atomic_wait_no_timeout);
}
}
我们可以很清楚的看到,有一个原子负载以指定的内存顺序来检查原子本身的状态。但是,我看不出如何将 整个操作 视为原子操作,因为在调用 __std_atomic_wait_direct 之前有一个比较。
对于条件变量,谓词本身是由互斥锁保护的,但是原子本身是如何保护的呢?
【问题讨论】:
-
memory_order_acquire和memory_order_acq_rel是这里可能存在的问题。没有它们,您描述的问题就不会发生。有了他们,我不能告诉你。如果没有这些标志您无法证明您的代码是正确的,不要使用这些标志。 -
我已经怀疑这是一个问题,但为什么
memory_order_seq_cst在这种情况下是安全的? -
如果有像我一样的人想知道 C++
atomic有像 apis 这样的条件变量,那么答案就是 C++20 中的这个宇宙 Difference between std::atomic and std::condition_variable wait, notify_* methods -
在我看来,您正在尝试使用
std::atomic<uint32_t>作为计数信号量。我会改用std::counting_semaphore>。我也将它构建到队列本身,所以push和pop处理它,而外部世界不会。顺便说一句,我还将超时传递给pop,因此它会在失败前自动等待指定的时间。 -
wait是原子操作,fetch_add也是如此。它们不会交错,而是以某种全局顺序执行。如果wait恰好先执行,那么它后面会跟着fetch_add和notify_all,这会唤醒它。如果fetch_add恰好先执行,那么在wait执行时,itemCount.load() != 0和wait将立即返回。你认为wait是分步执行的——检查原子的值,然后进入等待——但这与认为fetch_add先获取然后添加,并担心另一个操作会挤在两者之间是错误的。跨度>
标签: c++ multithreading wait atomic c++20