【问题标题】:Extend the life of threads with synchronization (C++11)通过同步延长线程的寿命 (C++11)
【发布时间】:2013-02-21 12:51:37
【问题描述】:

我有一个程序,其函数将指针作为 arg 和 main。主要是创建 n 个线程,每个线程根据传递的arg 在不同的内存区域上运行函数。然后加入线程,主线程在区域之间执行一些数据混合,并创建 n 个新线程,它们执行与旧线程相同的操作。

为了改进程序,我希望保持线程处于活动状态,从而消除创建线程所需的长时间。线程应该在 main 工作时休眠,并在它们必须再次出现时通知。同样,当线程工作时,main 应该等待,就像它对 join 所做的那样。

我最终无法对此进行强有力的实施,总是陷入僵局。

简单的基线代码,任何关于如何修改它的提示将不胜感激

#include <thread>
#include <climits>

...

void myfunc(void * p) {
  do_something(p);
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};
  std::thread mythread[n_threads];
  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i]);
    }
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i].join();
    }
    mix_data(myp); 
  }
  return 0;
}

【问题讨论】:

  • 死锁到底在哪里?是在mythread[i].join(); 上吗?
  • 不,这是我现在拥有的无法死锁的代码示例。我找不到更好的实现(不会删除加入它们的线程)不会死锁。
  • 你在找thread pools吗?
  • 池可能是一个(不明显的)解决方案,但我认为这是一种矫枉过正。我也在寻找一种执行速度非常快且实现简单的方法,大概mutex和conditional_variable应该是第一个尝试的方法。
  • @DarioP - 是的,你做到了,诚实 :)

标签: c++ multithreading c++11 thread-safety


【解决方案1】:

以下是执行一些随机操作的简单编译和工作代码。它实现了 aleguna 的屏障概念。每个线程的任务长度不同,所以确实需要一个强大的同步机制。我将尝试对相同的任务进行池化并对结果进行基准测试,然后可能会使用 Andy Prowl 指出的未来。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <complex>
#include <random>

const unsigned int n_threads=4; //varying this will not (almost) change the total amount of work
const unsigned int task_length=30000/n_threads;
const float task_length_variation=task_length/n_threads;
unsigned int rep=1000; //repetitions of tasks

class t_chronometer{
 private: 
  std::chrono::steady_clock::time_point _t;

 public:
  t_chronometer(): _t(std::chrono::steady_clock::now()) {;}
  void reset() {_t = std::chrono::steady_clock::now();}
  double get_now() {return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - _t).count();}
  double get_now_ms() {return 
      std::chrono::duration_cast<std::chrono::duration<double,std::milli>>(std::chrono::steady_clock::now() - _t).count();}
};

class t_barrier {
 private:
   std::mutex m_mutex;
   std::condition_variable m_cond;
   unsigned int m_threshold;
   unsigned int m_count;
   unsigned int m_generation;

 public:
   t_barrier(unsigned int count):
    m_threshold(count),
    m_count(count),
    m_generation(0) {
   }

   bool wait() {
      std::unique_lock<std::mutex> lock(m_mutex);
      unsigned int gen = m_generation;

      if (--m_count == 0)
      {
          m_generation++;
          m_count = m_threshold;
          m_cond.notify_all();
          return true;
      }

      while (gen == m_generation)
          m_cond.wait(lock);
      return false;
   }
};


using namespace std;

void do_something(complex<double> * c, unsigned int max) {
  complex<double> a(1.,0.);
  complex<double> b(1.,0.);
  for (unsigned int i = 0; i<max; i++) {
    a *= polar(1.,2.*M_PI*i/max);
    b *= polar(1.,4.*M_PI*i/max);
    *(c)+=a+b;
  }
}

bool done=false;
void task(complex<double> * c, unsigned int max, t_barrier* start_barrier, t_barrier* end_barrier) {
  while (!done) {
    start_barrier->wait ();
    do_something(c,max);
    end_barrier->wait ();
  }
  cout << "task finished" << endl;
}

int main() {
  t_chronometer t;

  std::default_random_engine gen;
  std::normal_distribution<double> dis(.0,1000.0);

  complex<double> cpx[n_threads];
  for (unsigned int i=0; i < n_threads; i++) {
    cpx[i] = complex<double>(dis(gen), dis(gen));
  }

  t_barrier start_barrier (n_threads + 1); // child threads + main thread
  t_barrier end_barrier (n_threads + 1); // child threads + main thread

  std::thread mythread[n_threads];
  unsigned long int sum=0;
  for (unsigned int i=0; i < n_threads; i++) {
    unsigned int max = task_length +  i * task_length_variation;
    cout << i+1 << "th task length: " << max << endl;
    mythread[i] = std::thread(task, &cpx[i], max, &start_barrier, &end_barrier);
    sum+=max;
  }
  cout << "total task length " << sum << endl;

  complex<double> c(0,0);
  for (unsigned long int j=1; j < rep+1; j++) {
    start_barrier.wait (); //give to the threads the missing call to start
    if (j==rep) done=true;
    end_barrier.wait (); //wait for the call from each tread
    if (j%100==0) cout << "cycle: " << j << endl;
    for (unsigned int i=0; i<n_threads; i++) {
      c+=cpx[i];
    }
  }
  for (unsigned int i=0; i < n_threads; i++) {
    mythread[i].join();
  }
  cout << "result: " << c << " it took: " << t.get_now() << " s." << endl;
  return 0;
}

【讨论】:

  • 我想说屏障解决方案并没有提供很大的性能提升。对于目前的实现,加入线程并创建新线程几乎与使用 cv 通知它们的时间相同。
【解决方案2】:

你想要的概念是线程池。这个SO question 处理现有的实现。

这个想法是为多个线程实例提供一个容器。每个实例都与一个轮询任务队列的函数相关联,当任务可用时,拉取并运行它。一旦任务结束(如果它终止,但这是另一个问题),线程简单地循环到任务队列。

所以你需要一个同步队列,一个实现队列循环的线程类,一个任务对象的接口,也许还有一个驱动整个事物的类(池类)。

或者,您可以为它必须执行的任务创建一个非常专业的线程类(例如,只有内存区域作为参数)。这需要线程的通知机制来指示它们已完成当前迭代。

线程主函数将是该特定任务的循环,在一次迭代结束时,线程发出结束信号,并等待条件变量开始下一个循环。本质上,您将在线程中内联任务代码,完全不需要队列。

 using namespace std;

 // semaphore class based on C++11 features
 class semaphore {
     private:
         mutex mMutex;
         condition_variable v;
         int mV;
     public:
         semaphore(int v): mV(v){}
         void signal(int count=1){
             unique_lock lock(mMutex);
             mV+=count;
             if (mV > 0) mCond.notify_all();
         }
         void wait(int count = 1){
             unique_lock lock(mMutex);
             mV-= count;
             while (mV < 0)
                 mCond.wait(lock);
         }
 };

template <typename Task>
class TaskThread {
     thread mThread;
     Task *mTask;
     semaphore *mSemStarting, *mSemFinished;
     volatile bool mRunning;
    public:
    TaskThread(Task *task, semaphore *start, semaphore *finish): 
         mTask(task), mRunning(true), 
         mSemStart(start), mSemFinished(finish),
        mThread(&TaskThread<Task>::psrun){}
    ~TaskThread(){ mThread.join(); }

    void run(){
        do {
             (*mTask)();
             mSemFinished->signal();
             mSemStart->wait();
        } while (mRunning);
    }

   void finish() { // end the thread after the current loop
         mRunning = false;
   }
private:
    static void psrun(TaskThread<Task> *self){ self->run();}
 };

 classcMyTask {
     public:
     MyTask(){}
    void operator()(){
        // some code here
     }
 };

int main(){
    MyTask task1;
    MyTask task2;
    semaphore start(2), finished(0);
    TaskThread<MyTask> t1(&task1, &start, &finished);
    TaskThread<MyTask> t2(&task2, &start, &finished);
    for (int i = 0; i < 10; i++){
         finished.wait(2);
         start.signal(2);
    }
    t1.finish();
    t2.finish();
}

上面提出的(粗略的)实现依赖于Task 类型,该类型必须提供operator()(即类仿函数)。之前说过可以将任务代码直接合并到线程函数体中,但由于我不知道,所以我尽量保持抽象。线程开始有一个条件变量,线程结束有一个条件变量,两者都封装在信号量实例中。

看到另一个建议使用 boost::barrier 的答案,我只能支持这个想法:如果可能,请确保将我的信号量类替换为该类,原因是它最好依靠经过良好测试和维护的外部代码,而不是针对相同功能集自行实现的解决方案。

总而言之,这两种方法都是有效的,但前者为了灵活性而放弃了一点点性能。如果要执行的任务需要足够长的时间,则管理和队列同步成本可以忽略不计。

更新:代码已修复和测试。用信号量替换了一个简单的条件变量。

【讨论】:

  • 这就是我在 cmets 中的建议,但显然 OP 认为它们不符合他的需求。您能否解决他对线程池可能过度杀伤力的担忧?
  • 我想我或多或少做到了。我没有提供示例实现,因为我正在使用我的手机 atm,而且我对 c++ std::threads 和条件变量没有足够的了解。自信地写出来。欢迎编辑。
  • 这是寻找池的第二个建议。我会考虑这个,但仍然必须有一个基于互斥体和条件变量的解决方案。我想从这个角度来解决这个问题。
  • @DarioP 线程池基于互斥体和条件变量,它们只是隐藏了一些漂亮干净的界面后面的杂物。
  • 是的 - 请使用泳池方法。微管理线程几乎总是失败,通常很糟糕。
【解决方案3】:

这是一种仅使用 C++11 标准库中的类的可能方法。基本上,您创建的每个线程都有一个关联的命令队列(封装在std::packaged_task&lt;&gt; 对象中),它会不断检查。如果队列为空,线程将只等待条件变量 (std::condition_variable)。

虽然通过使用std::mutexstd::unique_lock&lt;&gt; RAII 包装器可以避免数据争用,但主线程可以通过存储与每个提交的std::packaged_tast&lt;&gt; 关联的std::future&lt;&gt; 对象并调用wait() 就可以了。

下面是一个遵循这种设计的简单程序。评论应该足以解释它的作用:

#include <thread>
#include <iostream>
#include <sstream>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>

// Convenience type definition
using job = std::packaged_task<void()>;

// Some data associated to each thread.
struct thread_data
{
    int id; // Could use thread::id, but this is filled before the thread is started
    std::thread t; // The thread object
    std::queue<job> jobs; // The job queue
    std::condition_variable cv; // The condition variable to wait for threads
    std::mutex m; // Mutex used for avoiding data races
    bool stop = false; // When set, this flag tells the thread that it should exit
};

// The thread function executed by each thread
void thread_func(thread_data* pData)
{
    std::unique_lock<std::mutex> l(pData->m, std::defer_lock);
    while (true)
    {
        l.lock();

        // Wait until the queue won't be empty or stop is signaled
        pData->cv.wait(l, [pData] () {
            return (pData->stop || !pData->jobs.empty()); 
            });

        // Stop was signaled, let's exit the thread
        if (pData->stop) { return; }

        // Pop one task from the queue...
        job j = std::move(pData->jobs.front());
        pData->jobs.pop();

        l.unlock();

        // Execute the task!
        j();
    }
}

// Function that creates a simple task
job create_task(int id, int jobNumber)
{
    job j([id, jobNumber] ()
    {
        std::stringstream s;
        s << "Hello " << id << "." << jobNumber << std::endl;
        std::cout << s.str();
    });

    return j;
}

int main()
{
    const int numThreads = 4;
    const int numJobsPerThread = 10;
    std::vector<std::future<void>> futures;

    // Create all the threads (will be waiting for jobs)
    thread_data threads[numThreads];
    int tdi = 0;
    for (auto& td : threads)
    {
        td.id = tdi++;
        td.t = std::thread(thread_func, &td);
    }

    //=================================================
    // Start assigning jobs to each thread...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();


    //=================================================
    // Here the main thread does something...

    std::cin.get();

    // ...done!
    //=================================================


    //=================================================
    // Posts some new tasks...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();

    // Send stop signal to all threads and join them...
    for (auto& td : threads)
    {
        std::unique_lock<std::mutex> l(td.m);
        td.stop = true;
        td.cv.notify_one();
    }

    // Join all the threads
    for (auto& td : threads) { td.t.join(); }
}

【讨论】:

  • 是的!非常感谢..这个解决方案并不像障碍那么简单,但仍然不难理解、实施和控制。发帖的时候我脑子里就有这样的想法,现在我需要看看这是否真的是我想要的。
  • @DarioP:很高兴它有帮助!事实上,这可能不是最简单的解决方案,但在设计方面,我认为这是简单性和灵活性之间的公平折衷。例如,由于上面的程序设置了一组运行线程,每个线程都有自己的作业队列,因此很容易从中构建一个线程池类。此外,通过使用 C++11 并发库的其他类(例如 std::promise),您可以更好地控制作业的进度,而无需建立进一步的通信协议(例如查看 partial工作结果可用)。
【解决方案4】:

它可以很容易地使用屏障来实现(只是对条件变量和计数器的方便包装)。它基本上会阻塞,直到所有 N 个线程都达到“障碍”。然后它再次“回收”。 Boost 提供了一个实现。

void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) {
  while (!stop_condition) // You'll need to tell them to stop somehow
  {
      start_barrier.wait ();
      do_something(p);
      end_barrier.wait ();
  }
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};

  boost::barrier start_barrier (n_threads + 1); // child threads + main thread
  boost::barrier end_barrier (n_threads + 1); // child threads + main thread

  std::thread mythread[n_threads];

    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier);
    }

  start_barrier.wait (); // first unblock the threads

  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    end_barrier.wait (); // mix_data must not execute before the threads are done
    mix_data(myp); 
    start_barrier.wait (); // threads must not start new iteration before mix_data is done
  }
  return 0;
}

【讨论】:

  • 这是一个科学软件,原则上应该在许多不同的机器上编译和运行,并且很容易适应在集群上运行。出于可移植性的目的,我想只使用标准库或至少 POSIX,因此避免提升。然而,这个解决方案真的非常非常好和简单!我会认真考虑的,谢谢!
  • @DarioP,barrier 很容易实现十几行代码。您可以从此处复制粘贴 boost.org/doc/libs/1_53_0/boost/thread/barrier.hpp 只需将 boost::mutexconditional_variable 替换为 posix 等效项
  • 是的,实现屏障类应该很容易。我还改进了一点你的代码。你觉得这样好吗?
  • @DarioP,不是不行。随着您的更正,代码将导致死锁
  • 事实上,这是我在几个周期内获得的。现在我必须弄清楚为什么。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-15
  • 1970-01-01
  • 2011-06-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多