【问题标题】:What is the best way to realize a synchronization barrier between threads实现线程之间同步屏障的最佳方法是什么
【发布时间】:2016-12-24 07:19:14
【问题描述】:

有几个线程在运行,我需要保证我的每个线程都达到了某个点,然后再继续。我需要实施一种障碍。考虑一个可以从多个线程运行的函数func

void func()
{
  operation1();
  // wait till all threads reached this point 
  operation2();
}

使用 C++ 11 和 VS12 实现这一障碍的最佳方法是什么,如果需要,可以考虑提升。

【问题讨论】:

  • 简单的方法是将工作分离到两个不同的函数中,在主线程中创建线程来运行前半部分,加入所有线程然后运行后半部分。
  • 这实际上会很复杂。我的架构不允许我将工作分开。
  • 有一个称为屏障的实际线程原语用于此目的。我知道 pthread 实现了这一点。此外,boost 实现了这一点。
  • this question 的答案似乎很好地涵盖了这一点。

标签: c++ multithreading c++11


【解决方案1】:

你可以使用boost::barrier
不幸的是,线程屏障概念本身并不是 c++11 或 Visual c++ 的一部分。
在纯 c++11 中,您可以使用 condition variable 和计数器。

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>

class my_barrier
{

 public:
    my_barrier(int count)
     : thread_count(count)
     , counter(0)
     , waiting(0)
    {}

    void wait()
    {
        //fence mechanism
        std::unique_lock<std::mutex> lk(m);
        ++counter;
        ++waiting;
        cv.wait(lk, [&]{return counter >= thread_count;});
        cv.notify_one();
        --waiting;
        if(waiting == 0)
        {
           //reset barrier
           counter = 0;
        }
        lk.unlock();
    }

 private:
      std::mutex m;
      std::condition_variable cv;
      int counter;
      int waiting;
      int thread_count;
};

int thread_waiting = 3;
my_barrier barrier(3);


void func1()
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    barrier.wait();
    std::cout << "I have awakened" << std::endl;
}

void func2()
{
    barrier.wait();
    std::cout << "He has awakened!!" << std::endl;
}

int main() {
    std::thread t1(func1);  
    std::thread t2(func2);
    std::thread t3(func2);
    t1.join();
    t2.join();
    t3.join();
}

每个线程都等待直到满足谓词。最后一个线程将使谓词有效,并允许等待的线程继续。如果你想重复使用 障碍(例如多次调用该函数),您需要另一个 变量来重置计数器。

当前的实现是有限的。两次调用func();func(); 可能不会让线程第二次等待。

【讨论】:

  • 非常感谢...看起来很简单,很有前途!
  • wait 的示例实现;想象一下 2 个线程的障碍 - 它们都调用wait,调用等待第二个计数的线程将取得进展。但是由于第一次调用wait 的线程永远不会被唤醒(notified),理论上它可能会永远等待。
【解决方案2】:

一个选项可能是使用 OpenMP 框架。

#include <omp.h>

void func()
{
  #pragma omp parallel num_threads(number_of_threads)
  {
    operation1();

    #pragma omp barrier
    // wait till all threads reached this point 

    operation2();
  }
}

使用 -fopenmp 编译代码

【讨论】:

  • 听起来很有趣,是否有任何文档/参考如何使用 omp?
  • 这里有一个很好的入门教程openmp tutorial,参考这里是openmp reference。一个优秀的论坛在这里openmp forum
  • 听起来不便携
【解决方案3】:

解决方案

#include <cassert>
#include <condition_variable>

class Barrier
{

public:

    Barrier(std::size_t nb_threads)
        : m_mutex(),
        m_condition(),
        m_nb_threads(nb_threads)
    {
        assert(0u != m_nb_threads);
    }

    Barrier(const Barrier& barrier) = delete;

    Barrier(Barrier&& barrier) = delete;

    ~Barrier() noexcept
    {
        assert(0u == m_nb_threads);
    }

    Barrier& operator=(const Barrier& barrier) = delete;

    Barrier& operator=(Barrier&& barrier) = delete;

    void Wait()
    {
        std::unique_lock< std::mutex > lock(m_mutex);

        assert(0u != m_nb_threads);

        if (0u == --m_nb_threads)
        {
            m_condition.notify_all();
        }
        else
        {
            m_condition.wait(lock, [this]() { return 0u == m_nb_threads; });
        }
    }

private:

    std::mutex m_mutex;

    std::condition_variable m_condition;

    std::size_t m_nb_threads;
};

示例

#include <chrono>
#include <iostream>
#include <thread>

Barrier barrier(2u);

void func1()
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    barrier.Wait();
    std::cout << "t1 awakened" << std::endl;
}

void func2()
{
    barrier.Wait();
    std::cout << "t2 awakened" << std::endl;
}

int main()
{
    std::thread t1(func1);  
    std::thread t2(func2);
    t1.join();
    t2.join();

    return 0;
}

在线试用WandBox

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-12-01
    • 2017-03-02
    • 1970-01-01
    • 1970-01-01
    • 2021-06-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多