【问题标题】:Synchronizing producer/consumer with a barrier将生产者/消费者与屏障同步
【发布时间】:2018-10-16 08:30:45
【问题描述】:

我有一个生产者线程,它为三个消费者线程生产工作。当工作产生时,生产者线程等待直到消费者线程完成处理工作。然后生产者线程继续处理结果。

#include <condition_variable>
#include <mutex>
#include <boost/thread/barrier.hpp>
#include <vector>
#include <queue>


std::condition_variable cond;
std::mutex mutex;
boost::barrier barrier(4);

std::vector<std::thread> workers;
std::queue<unsigned int> work;
std::queue<unsigned int> results;

void worker();

int main()
{
    // 1 producer and 3 consumers
    for(unsigned int i = 0; i < 3; i++)
        workers.push_back(std::thread(worker));

    // Wait here so the three workers can get to cond.wait();
    barrier.wait();

    std::unique_lock<std::mutex> lock(mutex);
    while(true)
    {
        // Generate work
        std::cout << "gen" << std::endl;
        for(unsigned int i = 0; i < 10; i++)
            work.push(i);

        cond.notify_all();

        lock.unlock();
        barrier.wait();

        // Handle the results
        while(results.size() > 0)
            results.pop();

        lock.lock();
    }

    return 0;
}

void worker()
{
    while(true)
    {
        std::unique_lock<std::mutex> lock(mutex);
        while(results.size() == 0)
        {
            lock.unlock();
            barrier.wait();
            lock.lock();
            cond.wait(lock);
        }

        // Get work
        unsigned int next = work.front();
        work.pop();

        // Store the result
        results.push(next);

        lock.unlock();


    }
}

问题是在生产者线程开始下一次迭代之前,我需要确保所有消费者线程都进入cond.wait(lock)

  1. 所有 4 个线程都已到达屏障。屏障被释放,线程可以继续。
  2. 生产者线程在所有消费者线程到达cond.wait(lock) 之前锁定互斥体。因此,至少有一个消费者线程被lock.lock() 阻塞。
  3. 生产者线程开始下一次迭代,创建工作并通知消费者。由于至少一个消费者线程尚未到达cond.wait(lock),因此至少一个消费者线程将错过notify_all()。这些线程现在等待下一个notify_all() - 这永远不会到达。
  4. 下次到达屏障时,至少有一个消费者线程仍在等待下一个notify_all()。因此屏障不会被解锁并发生死锁。

我该如何解决这种情况?

【问题讨论】:

  • 您能评论一下您为什么拒绝我的问题吗?
  • 可能是因为你没有minimal reproducible example
  • 这个例子现在应该更好了。
  • 我不明白你为什么需要屏障。 produce work =&gt; wait until all work resolved =&gt; combine the results. 还不够吗?
  • 我怎么能等到所有工作都解决了,而不是循环旋转?

标签: c++ multithreading mutex producer-consumer barrier


【解决方案1】:

条件变量应与标志一起使用,以帮助防止虚假唤醒。同样的标志也可以用来检查线程是应该等待还是直接开始工作。

添加一个bool go_to_work=false;,然后我们只需将其作为谓词添加到wait 的调用中,并确保我们在主线程中设置/取消设置它。

在调用 notify_all 之前在主线程中设置布尔值

go_to_work=true;
cond.notify_all();

在我们的工作线程中,我们将谓词添加到 wait 调用中

cond.wait(lock, [](){ return go_to_work; });

最后,在我们的主线程中,我们希望在所有工作完成后将标志设置回 false。

barrier.wait();
lock.lock();  // We need to lock the mutex before modifying the bool
go_to_work=false;
lock.unlock();

//Handle result...

现在,如果一个线程在主线程设置go_to_work=true 之后到达wait 调用,它根本不会等待,而是继续工作。作为奖励,这还可以防止虚假唤醒。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-13
    • 2018-01-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多