【问题标题】:lock-free "closable" MPSC queue无锁“可关闭”MPSC 队列
【发布时间】:2020-06-13 13:00:57
【问题描述】:

多生产者单消费者场景,除了消费发生一次,之后队列“关闭”并且不允许更多工作。我有一个 MPSC 队列,所以我尝试添加一个无锁算法来“关闭”队列。我相信它是正确的,它通过了我的测试。问题是当我尝试优化内存顺序时它停止工作(我认为工作丢失了,例如在队列关闭后排队)。即使在具有“某种”强内存模型的 x64 上,即使只有一个生产者。

我对内存顺序进行微调的尝试被注释掉了:

// thread-safe for multi producers single consumer use
// linked-list based, and so it's growable 
MPSC_queue work_queue;
std::atomic<bool> closed{ false };
std::atomic<int32_t> producers_num{ 0 };
bool produce(Work&& work)
{
    bool res = false;

    ++producers_num;
    // producers_num.fetch_add(1, std::memory_order_release);
    if (!closed)
    // if (!closed.load(std::memory_order_acquire))
    {
        work_queue.push(std::move(work));
        res = true;
    }
    --producers_num;
    // producers_num.fetch_sub(1, std::memory_order_release);

    return res;
}
void consume()
{
    closed = true;
    // closed.store(true, std::memory_order_release);

    while (producers_num != 0)
    // while (producers_num.load(std::memory_order_acquire) != 0)
        std::this_thread::yield();

    Work work;
    while (work_queue.pop(work))
        process(work);
}

我还尝试了std::memory_order_acq_relproducers_num 的读-修改-写操作,也不起作用。

一个额外的问题:

该算法与 MPSC 队列一起使用,它已经在内部进行了一些同步。将它们结合起来以获得更好的性能会很好。你知道“可关闭”MPSC 队列的任何此类算法吗?

【问题讨论】:

  • work_queue.push 到底是做什么的?那是线程安全的吗?您似乎遗漏了变量声明等关键细节。
  • 这是 MPSC 队列 - 无锁多生产者单消费者
  • @PeterCordes:添加了缺失的细节
  • 由于您只想使用一次队列,它不需要环绕并且可能会简单得多。就像一个原子生产者位置计数器,作者递增以声明一个位置,如果他们得到一个位置 > 大小,那么队列就满了。
  • @PeterCordes:不确定我是否关注。队列永远不会满,因为它是基于链表的。这可能是一个重要的细节,添加到问题中

标签: c++ multithreading lock-free


【解决方案1】:

我认为closed = true; 确实需要 seq_cst 以确保它对其他线程可见您第一次检查producers_num。否则这个排序是可能的:

  • 制作人:++producers_num;
  • 消费者:producers_num == 0
  • 生产者:if (!closed) 发现它仍然打开
  • 消费者:close.store(true, release) 变得全局可见。
  • 消费者:work_queue.pop(work) 发现队列为空
  • 生产者:work_queue.push(std::move(work)); 在消费者停止查找之后将工作添加到队列中。

如果您在返回之前有消费者检查 producers_num == 0,您仍然可以避免 seq_cst,例如

    while (producers_num != 0)
    // while (producers_num.load(std::memory_order_acquire) != 0)
        std::this_thread::yield();

    do {
        Work work;
        while (work_queue.pop(work))
            process(work);
    } while(producers_num.load(acquire) != 0);
    // safe if pop included a full barrier, I think

我不是 100% 确定我有这个权利,但我认为在完全障碍后检查 producer_num 就足够了。

但是,生产者端确实需要++producers_num; 至少是acq_rel,否则它可以重新排序超过if (!closed)。 (在它之后,if(!closed) 之前的获取栅栏也可能有效)。


由于您只想使用一次队列,因此它不需要环绕并且可能会简单得多。就像一个原子生产者位置计数器,作者递增以声明一个位置,如果他们得到一个位置 > 大小,那么队列就满了。不过,我还没有考虑完整的细节。

这可能会为上述问题提供更简洁的解决方案,也许是让消费者查看该写入索引以查看是否有任何生产者

【讨论】:

  • 抱歉回复晚了。这对我来说确实很有意义,尽管我不明白为什么我们需要这里的顺序一致性。为避免重新排序您提到的,我们需要 closed = true 到“之前发生”while (producers_num != 0)。这可以通过closed.store(true, std::memory_order_acq_rel) 实现吗?不确定memory_order_acq_rel 是否适用(严格来说)存储操作。
  • @AndriyTylychko:不适用。纯存储只能是relax、release或seq_cst。获取应用没有负载部分。如果你愿意,你可以把它写成 acq_rel exchange。 (有趣的事实:x86 编译器使用 xchg 来实现 seq_cst 存储。)
猜你喜欢
  • 2019-07-04
  • 2017-04-21
  • 2011-08-30
  • 2013-12-15
  • 1970-01-01
  • 1970-01-01
  • 2013-04-22
  • 2017-10-20
  • 1970-01-01
相关资源
最近更新 更多