【问题标题】:How to wait for completion of all tasks in this ThreadPool?如何等待此线程池中的所有任务完成?
【发布时间】:2020-03-27 09:06:57
【问题描述】:

我正在尝试写一个ThreadPool

class ThreadPool {
  public:
    ThreadPool(size_t numberOfThreads):isAlive(true) {
      for(int i =0; i < numberOfThreads; i++) {
        workerThreads.push_back(std::thread(&ThreadPool::doJob, this));
      }

      #ifdef DEBUG 
      std::cout<<"Construction Complete"<<std::endl; 
      #endif
    }

    ~ThreadPool() {
      #ifdef DEBUG 
      std::cout<<"Destruction Start"<<std::endl; 
      #endif

      isAlive = false;
      conditionVariable.notify_all();
      waitForExecution();

      #ifdef DEBUG 
      std::cout<<"Destruction Complete"<<std::endl; 
      #endif
    }

    void waitForExecution() {
      for(std::thread& worker: workerThreads) {
        worker.join();
      }
    }

    void addWork(std::function<void()> job) {
      #ifdef DEBUG 
      std::cout<<"Adding work"<<std::endl; 
      #endif
      std::unique_lock<std::mutex> lock(lockListMutex);
      jobQueue.push_back(job);
      conditionVariable.notify_one();
    }

  private:
    // performs actual work
    void doJob() {
      // try {
        while(isAlive) {
          #ifdef DEBUG 
          std::cout<<"Do Job"<<std::endl; 
          #endif

          std::unique_lock<std::mutex> lock(lockListMutex);
          if(!jobQueue.empty()) {
            #ifdef DEBUG 
            std::cout<<"Next Job Found"<<std::endl; 
            #endif

            std::function<void()> job = jobQueue.front();
            jobQueue.pop_front();
            job();
          }
          conditionVariable.wait(lock);
        }
    }

    // a vector containing worker threads
    std::vector<std::thread> workerThreads;

    // a queue for jobs
    std::list<std::function<void()>> jobQueue;

    // a mutex for synchronized insertion and deletion from list
    std::mutex lockListMutex;

    std::atomic<bool> isAlive;

    // condition variable to track whether or not there is a job in queue
    std::condition_variable conditionVariable;
};

我正在从我的主线程向这个线程池添加工作。我的问题是调用waitForExecution() 导致永远等待主线程。我需要能够在所有工作完成后终止线程并从那里继续执行主线程。我该怎么做?

【问题讨论】:

  • 顺便说一下,我注意到你的线程池一次只运行一个作业,因为当一个线程运行一个作业时,它仍然持有锁,阻止任何其他线程获取作业。跨度>
  • 感谢指出,我现在在pop_front()之后解锁互斥锁

标签: c++ multithreading c++11 c++14 threadpool


【解决方案1】:

编写健壮的线程池的第一步是将队列从线程管理中分离出来。线程安全的队列很难自己编写,并且类似地管理线程。

线程安全队列如下所示:

template<class T>
struct threadsafe_queue {
  boost::optional<T> pop() {
    std::unique_lock<std::mutex> l(m);
    cv.wait(l, [&]{ aborted || !data.empty(); } );
    if (aborted) return {};
    return data.pop_front();
  }

  void push( T t )
  {
    std::unique_lock<std::mutex> l(m);
    if (aborted) return;
    data.push_front( std::move(t) );
    cv.notify_one();
  }

  void abort()
  {
    std::unique_lock<std::mutex> l(m);
    aborted = true;
    data = {};
    cv.notify_all();
  }
  ~threadsafe_queue() { abort(); }
private:
  std::mutex m;
  std::condition_variable cv;
  std::queue< T > data;
  bool aborted = false;
};

其中pop 在队列中止时返回nullopt

现在我们的线程池很简单:

struct threadpool {
  explicit threadpool(std::size_t n) { add_threads(n); }
  threadpool() = default;
  ~threadpool(){ abort(); }

  void add_thread() { add_threads(1); }
  void add_threads(std::size_t n)
  {
    for (std::size_t i = 0; i < n; ++i)
      threads.push_back( std::thread( [this]{ do_thread_work(); } ) );
  }
  template<class F>
  auto add_task( F && f )
  {
    using R = std::result_of_t< F&() >;
    auto pptr = std::make_shared<std::promise<R>>();
    auto future = pptr.get_future();
    tasks.push([pptr]{ (*pptr)(); });
    return future;
  }
  void abort()
  {
    tasks.abort();
    while (!threads.empty()) {
      threads.back().join();
      threads.pop_back();
    }
  }
private:
  threadsafe_queue< std::function<void()> > tasks;
  std::vector< std::thread > threads;
  void do_thread_work() {
    while (auto f = tasks.pop()) {
      (*f)();
    }
  }
};

请注意,如果您中止,未完成的未来将充满违反承诺的异常。

工作线程在从中提供数据的队列中止时停止运行。 abort() 上的主线程将等待工作线程完成(这是明智的)。

这确实意味着工作线程任务也必须终止,否则主线程将挂起。没有办法避免这种情况;通常,您的工作线程的任务需要合作才能获得一条消息,说明它们应该提前中止。

Boost 有一个线程池,该线程池与其线程原语集成,并允许较少合作的中止;其中,所有互斥锁类型的操作都会隐式检查中止标志,如果看到则操作抛出。

【讨论】:

    【解决方案2】:

    我应该如何继续?

    嗯,你应该学会使用你的调试器,它会准确地告诉你你想加入的每个线程在哪里停止。

    我会告诉你看起来有什么问题,但强烈建议你先这样做。这是无价的。


    好的,现在:你的条件变量循环是错误的。

    正确的模式是行为类似于第二种形式的模式,带有谓词参数here

    while (!pred()) {
        wait(lock);
    }
    

    具体来说,如果您的谓词为真,则您必须调用wait。你可能永远不会再被唤醒,因为谓词从一开始就不会变成假的!

    试试

          // wait until we have something to do
          while(jobQueue.empty() && isAlive) {
            conditionVariable.wait(lock);
          }
    
          // unless we're exiting, we must have a job
          if (isAlive) {
            #ifdef DEBUG 
            std::cout<<"Next Job Found"<<std::endl; 
            #endif
    
            std::function<void()> job = jobQueue.front();
            jobQueue.pop_front();
            job();
          }
    

    想象一下,当您调用 notify_all 时,您的线程正在运行作业 - 在通知已经发生后它将调用 wait,并且不会再次出现。由于它在完成工作和调用wait 之间不检查isAlive,因此它将永远等待。

    即使没有关闭问题,它也是错误的,因为它应该在有工作要做的时候继续消耗工作,而不是在每次完成时阻塞。这让我想起了最后一个问题 - 您可能应该在执行作业时解锁互斥锁(然后重新锁定它) - 否则您的池是单线程的。

    【讨论】:

    • 对不起,我是 C++ 编程新手。我理解你关于在工作前解锁互斥锁的观点。此外,我现在已经按照您的建议采用了正确的条件变量模式。但是,我仍然无法理解如何在所有处理完成后继续执行主线程。因为thread.join() 只会停止线程并且工作线程永远不会终止。
    • OP 根本不应该使用wait 的单参数版本。
    • @AbhishekBansal - 一旦isAlive 变为假,上面的代码将正确退出线程,因此join 不应再永远等待。 Yakk 是正确的,您应该使用谓词形式 wait btw - 我只做了最小的更改来显示您的代码出错的地方。
    猜你喜欢
    • 2011-06-02
    • 2019-10-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-29
    • 2010-10-16
    • 2022-01-19
    相关资源
    最近更新 更多