【问题标题】:Syncing Threads in Boost在 Boost 中同步线程
【发布时间】:2015-09-23 08:56:43
【问题描述】:

我正在尝试创建一个创建一个主线程和 10 个从线程的应用程序。我想在主线程运行后运行一次从属线程。所以对于每个主线程执行,每个从线程将执行一次。我试图用两个不同的conditional variables 来处理这个问题。因此,一个用于从属线程,以便它们可以等到主线程通知它们,另一个conditional variable 用于在每个子线程完成其任务后发出信号的主线程,因此主线程可以检查是否所有从属线程都已完成或不。代码如下:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

std::vector<boost::thread*> threads;

std::vector<boost::mutex*> data_ready_mutex;
std::vector<boost::condition_variable*> cond;
std::vector<bool> data_ready;
std::vector<int> num_run;

boost::mutex check_finish_mutex;
std::vector<bool> finished;

boost::atomic<int> data;
boost::atomic<int> next_thread_id;

boost::mutex finished_task_mutex;
boost::condition_variable finished_task_cond;
bool finished_task = false;

void signal_finished(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(finished_task_mutex);
    finished[id] = true;
    finished_task = true;
  }
  finished_task_cond.notify_all();
}

void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);

    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

void slave_therad()
{
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    finished[id] = false;

    data_ready[id] = false;

    data++;

    num_run[id]++;

    signal_finished(id);
  }
}

void main()
{
  size_t nThreads = 10;

  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  finished.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
    finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  while (true)
  {
    clock_t start_time = clock();

    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    while (true)
    {
      boost::unique_lock<boost::mutex> lock(finished_task_mutex);
      while (!finished_task)
      {
        finished_task_cond.wait(lock);
      }
      finished_task = false;

      size_t i = 0;
      for (; i < finished.size(); i++)
      {
        if (!finished[i]) break;
      }
      if (i == finished.size()) break;
    }

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;

    for (size_t i = 0; i < threads.size(); i++)
      finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

问题是代码在某处停止并陷入死锁。

另外,我尝试改变实现方式。因此,我使用了atomic&lt;int&gt;,它计算完成任务的线程数,并在主线程中检查线程数是否等于已更新自身的线程数,但此方法也卡在某处并进入僵局。 代码可以在这里找到:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

std::vector<boost::thread*> threads;                //!< Slave Threads array

std::vector<boost::mutex*>  data_ready_mutex;       //!< Mutex to guard the data_ready 
std::vector<bool>           data_ready;             //!< Shows if the data is ready for the slave thread or not.
std::vector<boost::condition_variable*> cond;       //!< conditional variable to wait on data being ready for the slave thread.

std::vector<int> num_run;                           //!< Stores the number of times each slave thread is run.

boost::atomic<int> data;                            //!< Stores the data processed by each slave thread
boost::atomic<int> next_thread_id;                  //!< id for the next thread (used for giving an id from 0,..., nThreads-1

boost::atomic<int> num_threads_done;                //!< Stores the number of slave threads which has finished their task

//! Signals a slave thread to start its task
void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);
    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

//! Slave thread function
void slave_therad()
{
  // assign an id to the current slave_thread
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    // wait for a signal from the main thread
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    // make the data not ready, so the loop is not going to run without the main thread signal after the thread is done.
    data_ready[id] = false;

    // TASK for SLAVE THREAD
    data++;

    // Increase the number of times the thread is run
    num_run[id]++;

    // Increase the number of threads which has finished their tasks.
    num_threads_done++;

  }
}

void main()
{
  size_t nThreads = 10;

  // creating the data ready mutexes, conditional variables, data_ready variable (bools), num_runs array.
  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
  }

  // Creating the slave threads
  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  // Main Thread Body
  while (true)
  {
    clock_t start_time = clock();

    // Reset the number of threads which are done.
    num_threads_done = 0;

    // Signals the slave threads to start doing their task.
    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    // Wait until all the slave threads are done.
    while (true)
      if (num_threads_done == threads.size()) break;

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }

}

甚至,我尝试使用障碍解决问题,但它并没有解决我的问题。代码如下:

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>

boost::barrier* barrier;                            //!< barrier to make sure all the slave threads are done their tasks.

std::vector<boost::thread*> threads;

std::vector<boost::mutex*>  data_ready_mutex;       //!< Mutex to guard the data_ready 
std::vector<bool>           data_ready;             //!< Shows if the data is ready for the slave thread or not.
std::vector<boost::condition_variable*> cond;       //!< conditional variable to wait on data being ready for the slave thread.

std::vector<int> num_run;                           //!< Stores the number of times each slave thread is run.

boost::atomic<int> data;                            //!< Stores the data processed by each slave thread
boost::atomic<int> next_thread_id;                  //!< id for the next thread (used for giving an id from 0,..., nThreads-1

boost::atomic<int> num_threads_done;                //!< Stores the number of slave threads which has finished their task

std::vector<bool> finished;                         //!< Array which stores if all the slave threads are done or not.
boost::mutex finished_task_mutex;                   //!< mutex to guard the finished_task variable
boost::condition_variable finished_task_cond;       //!< Conditional variable to wait for all the threads to finish they tasks.
boost::atomic<bool> finished_task(false);           //!< Variable which stores if the task of slave_threads are finished or not.

void signal_finished(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(finished_task_mutex);
    finished[id] = true;
    finished_task = true;
  }
  finished_task_cond.notify_all();
}

void signal_slave(const int& id)
{
  {
    boost::lock_guard<boost::mutex> lock(*data_ready_mutex[id]);

    data_ready[id] = true;
  }
  cond[id]->notify_all();
}

void slave_therad()
{
  int id = next_thread_id++;

  std::cout << "( " << id << " ) slave_thread created\n";
  while (true)
  {
    boost::unique_lock<boost::mutex> lock(*data_ready_mutex[id]);
    while (!data_ready[id])
    {
      cond[id]->wait(lock);
    }

    finished[id] = false;

    data_ready[id] = false;

    data++;

    num_run[id]++;

    barrier->wait();

    signal_finished(id);
  }
}

void main()
{
  size_t nThreads = 10;

  data_ready_mutex.resize(nThreads);
  cond.resize(nThreads);
  data_ready.resize(nThreads);
  finished.resize(nThreads);
  num_run.resize(nThreads, 0);
  for (size_t i = 0; i < nThreads; i++)
  {
    data_ready_mutex[i] = new boost::mutex();
    cond[i] = new boost::condition_variable();
    data_ready[i] = false;
    finished[i] = false;
  }

  barrier = new boost::barrier(nThreads);

  for (size_t i = 0; i < nThreads; i++)
  {
    threads.push_back(new boost::thread(slave_therad));
  }

  while (true)
  {
    clock_t start_time = clock();

    for (size_t i = 0; i < threads.size(); i++)
      signal_slave(static_cast<int>(i));

    while (true)
    {
      boost::unique_lock<boost::mutex> lock(finished_task_mutex);
      while (!finished_task)
      {
        finished_task_cond.wait(lock);
      }
      finished_task = false;
      break;
    }

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << static_cast<float>(end_time - start_time) / CLOCKS_PER_SEC << std::endl;

    for (size_t i = 0; i < threads.size(); i++)
      finished[i] = false;
  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

[更新] 因此,我只是在结构中使用了mutexconditional variablesdata_ready,如下所示,现在代码可以正常工作了。我认为使用pointer to mutex 等存在错误。代码如下:

//#define SYNC_WITH_BARRIER
#define SYNC_WITH_ATOMICS

// STD 
#include <iostream>
#include <vector>


// BOOST
#include <boost/thread.hpp>
#include <boost/atomic.hpp>
#include <boost/ptr_container/ptr_vector.hpp>

std::vector<boost::thread*> threads;

boost::atomic<int> next_thread_id(0);

boost::mutex finished_task_mutex;
boost::condition_variable finished_task_cond;
bool finished_task = false;

boost::atomic<int> num_finished_tasks(0);

struct Work
{
  Work(boost::barrier& _barrier) : b(&_barrier)
  {

  }

  boost::barrier*           b;
  boost::mutex              data_ready_mutex;
  boost::condition_variable data_ready_cond;
  bool                      data_ready;
  int                       num_run;
  boost::atomic<int>        data;
  bool                      finished;

  void signal_slave()
  {
    {
      boost::lock_guard<boost::mutex> lock(data_ready_mutex);
      data_ready = true;
      data_ready_cond.notify_all();
    }    
  }

  void slave_therad()
  {
    int id = next_thread_id++;

    std::cout << "( " << id << " ) slave_thread created\n";
    while (true)
    {
      boost::unique_lock<boost::mutex> lock(data_ready_mutex);
      while (!data_ready)
      {
        data_ready_cond.wait(lock);
      }

      finished = false;

      data_ready = false;

      data++;

      num_run++;

#ifdef SYNC_WITH_BARRIER
      b->count_down_and_wait();
#else 
#ifdef SYNC_WITH_ATOMICS
      num_finished_tasks++;
#endif
#endif
    }
  }

};

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

void main()
{
  size_t nThreads = 10;

  boost::thread_group tg;
  boost::ptr_vector<Work> work_items;
  work_items.reserve(nThreads);

  boost::barrier finish(nThreads + 1); // one for the main thread

  for (size_t i = 0; i < nThreads; i++)
  {
    work_items.push_back(new Work(finish));
    tg.create_thread(boost::bind(&Work::slave_therad, boost::ref(work_items.back()))); 
  }

  while (true)
  {
    auto start_time = hrc::now();

    num_finished_tasks = 0;

    for (size_t i = 0; i < work_items.size(); i++)
      work_items[i].signal_slave();

#ifdef SYNC_WITH_BARRIER
    finish.count_down_and_wait();
#else
#ifdef SYNC_WITH_ATOMICS
    while (true) if (num_finished_tasks == work_items.size()) break;
#endif
#endif

    clock_t end_time = clock();

    std::cout << "Elapsed Time = " << hrc::now() - start_time << std::endl;

  }

  for (size_t i = 0; i < nThreads; i++)
  {
    threads[i]->join();
  }
}

【问题讨论】:

  • @Niall 我在设置 cond 变量等后尝试创建线程,但它仍然陷入死锁
  • 您可以将std::move()std::thread 放入容器中,避免动态分配,这对我来说已经足够了。
  • Ulrich 是对的,尝试放置 mThread.join();到 main() 结束
  • 这不是我的意思,@Ivan。 main() 中的无用线程mThread 可以用对线程函数的简单调用来替换。顺便说一句:main() 已经在一个线程中运行(你总是at至少一个线程!),所以这个名字有点误导。
  • 或屏障/信号量

标签: c++ multithreading boost boost-thread


【解决方案1】:

@sehe 即使有屏障,它也陷入了僵局。 – mmostajab 5 mins ago

由于您没有展示任何关于您在那里所做的事情,所以让我通过整合您收到的所有建议来帮助您启动:

Live On Coliru

#include <boost/atomic.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>

namespace /*static*/ {
    boost::atomic<int> data;
    boost::atomic<int> num_threads_done;

    struct Work {
        void signal_slave()
        {
            boost::lock_guard<boost::mutex> lock(data_ready_mutex);
            data_ready = true;
            cond.notify_all();
        }

        void slave_thread()
        {
            static boost::atomic_int _id_gen(0);
            id = _id_gen++;

            std::cout << "(" << id << ") slave_thread created\n";
            while (true) {

                boost::unique_lock<boost::mutex> lock(data_ready_mutex);
                cond.wait(lock, [&]{ return data_ready; });

                data_ready = false;

                data++;

                num_run++;

                num_threads_done++;
            }
        }

      private:
        int id          = 0;
        bool data_ready = false;
        int  num_run    = 0;

        boost::mutex data_ready_mutex;
        boost::condition_variable cond;

    };

}

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

int main()
{
    boost::thread_group tg;

    size_t nThreads = 10;

    std::vector<Work> works(nThreads);

    for (size_t i = 0; i < nThreads; i++) {
        tg.create_thread(boost::bind(&Work::slave_thread, boost::ref(works[i])));
    }

    while (true) {
        auto start_time = hrc::now();

        for (auto& w : works)
            w.signal_slave();

        std::cout << "Elapsed Time = " << (hrc::now()-start_time) << std::endl;
    }

    tg.join_all();
}

请记住,我不知道您要在这里实现什么。添加障碍我想到了这一点:how to use boost barrier

【讨论】:

  • 这完全不同,因为最初的主循环总是向所有工作人员发出一个任务,然后等待他们完成后再处理下一轮。
  • @UlrichEckhardt 是的。您是对的,但是我将条件变量添加到主线程以等待从线程,并且它可以正常工作。
  • 现在,我的问题是我的代码和你的代码有什么区别? (除了清楚)?他们在做同样的事情,但我的不工作。
  • 看看“我添加了条件变量”在代码中的含义会很有趣。条件变量总是意味着关联的互斥体,因此它也意味着内存屏障。关于障碍的相关性,请参阅我对上述原子操作重新排序的评论。
  • 关于 OP 的自我回答更新:在这种情况下,仍然避免手动内存管理、共享全局变量、魔术标志(我认为完成标志已损坏):@ 987654324@.
【解决方案2】:

我尝试更改@sehe 答案,因此它完全解决了我正在寻找的问题,并且我实现了此代码:

#include <boost/atomic.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <vector>

namespace /*static*/ {
  boost::atomic<int> data;

  boost::barrier*           slave_thread_finished_barrier;
  boost::mutex              slave_thread_finished_mutex; 
  boost::condition_variable slave_thread_finished_cond;
  bool                      slave_thread_finished = false;

  struct Work {
    void signal_slave()
    {
      boost::lock_guard<boost::mutex> lock(data_ready_mutex);
      data_ready = true;
      cond.notify_all();
    }

    void slave_thread()
    {
      static boost::atomic_int _id_gen(0);
      id = _id_gen++;

      std::cout << "(" << id << ") slave_thread created\n";
      while (true) {

        boost::unique_lock<boost::mutex> lock(data_ready_mutex);
        cond.wait(lock, [&]{ return data_ready; });

        data_ready = false;

        data++;

        num_run++;

        slave_thread_finished_barrier->wait();

        // signaling the main thread that the slave threads are done.
        if (id == 0)
        {
          boost::lock_guard<boost::mutex> lock(slave_thread_finished_mutex);
          slave_thread_finished = true;
          slave_thread_finished_cond.notify_one();
        }

      }
    }

  private:
    int id = 0;
    bool data_ready = false;
    int  num_run = 0;

    boost::mutex data_ready_mutex;
    boost::condition_variable cond;

  };

}

#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>

using hrc = boost::chrono::high_resolution_clock;

int main()
{
  boost::thread_group tg;

  size_t nThreads = 10;

  slave_thread_finished_barrier = new boost::barrier(nThreads);
  std::vector<Work> works(nThreads);

  for (size_t i = 0; i < nThreads; i++) {
    tg.create_thread(boost::bind(&Work::slave_thread, boost::ref(works[i])));
  }

  while (true) {
    auto start_time = hrc::now();

    for (auto& w : works)
      w.signal_slave();

    // Wait for slave threads to finish.
    boost::unique_lock<boost::mutex> lock(slave_thread_finished_mutex);
    slave_thread_finished_cond.wait(lock, [&]{ return slave_thread_finished; });
    slave_thread_finished = false;

    std::cout << "Elapsed Time = " << (hrc::now() - start_time) << std::endl;
  }

  tg.join_all();
}

【讨论】:

  • 在这种情况下,仍然避免手动内存管理、共享全局、魔术标志(我认为完成标志被破坏):cleaned up
猜你喜欢
  • 1970-01-01
  • 2011-04-15
  • 1970-01-01
  • 2018-04-27
  • 1970-01-01
  • 2015-04-01
  • 1970-01-01
  • 2021-06-27
相关资源
最近更新 更多