【问题标题】:Implementation of the Wait operation for a thread pool线程池等待操作的实现
【发布时间】:2013-12-02 19:31:52
【问题描述】:

我已经实现了一个线程池。现在它的基本操作如下:

void 初始化(bool detached_threads);

bool dispatch( void *(*dispatch_fn)(void*) , void * arg , bool free_arg );

void shutdown();

静态 void * execute_task( void * arg );

现在我想添加操作wait(),它将由主线程调用,并等待线程池中的所有线程完成它们正在执行的任务。我不想使用 pthread_join 因为这会杀死所有线程并且我不想再次创建线程池。我已经按照下面提供的代码实现了等待操作,但似乎不正确。

请给我建议什么是错的。谢谢!!!

#include "../inc/ThreadPool.hpp"

#include <cstdio>
#include <cstdlib>
#include <iostream>

using namespace std;

ThreadPool::ThreadPool( unsigned int n )
:num_threads(n)
{
    if(num_threads<=0)
    {
    num_threads = DEFAULT_THREAD_POOL_SIZE;
    }

    barrier_count = 0;
    threads = (pthread_t*) malloc(sizeof(pthread_t)*num_threads);
    shutdown = false;
    dont_accept = false;

    pthread_mutex_init(&barrier_lock,NULL);
    pthread_cond_init(&barrier_reached,NULL);

    pthread_mutex_init(&q_lock,NULL);
    pthread_cond_init(&q_not_empty,NULL);
    pthread_cond_init(&q_empty,NULL);
}

ThreadPool::~ThreadPool()
{
  //cout << "~ThreadPool()" << endl; 
}

void ThreadPool::initialise( bool detached_threads )
{
    //pthread_attr_t attr;

    //if(detached_threads)
    //{
    //pthread_attr_init(&attr);
    //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //}

    for ( int i = 0 ; i<num_threads ; i++ )
    {
      pthread_create( &threads[i] , NULL , execute_task , this );
    }
}

bool ThreadPool::dispatch( void *(*routine)(void*) , void * arg , bool free_arg )
{
    task_t * new_task = (task_t*) malloc(sizeof(task_t));
    new_task->routine = routine;
    new_task->arg = arg;
    new_task->free_arg = free_arg;

    pthread_mutex_lock(&q_lock);

    if(dont_accept)
    {
        free(new_task);
        return false;
    }

    bool was_empty = tasks.empty();
    tasks.push(new_task);

    if(was_empty)
    {
        pthread_cond_signal(&q_not_empty);
    }

    pthread_mutex_unlock(&q_lock);

    return true;
}

void ThreadPool::shut_down()
{
      void * return_val;
      pthread_mutex_lock(&q_lock);
      dont_accept = true;

      while(!(tasks.empty()))
      {
      pthread_cond_wait(&q_empty,&q_lock);
      }

      shutdown = true;
      pthread_cond_broadcast(&q_not_empty);
      pthread_mutex_unlock(&q_lock);

      for(int i=0 ; i<num_threads ; i++)
      {
      //pthread_join(threads[i],NULL);
      pthread_join(threads[i],&return_val);
      }

      free(threads);

      pthread_mutex_destroy(&barrier_lock);
      pthread_cond_destroy(&barrier_reached);

      pthread_mutex_destroy(&q_lock);
      pthread_cond_destroy(&q_empty);
      pthread_cond_destroy(&q_not_empty);
}

void ThreadPool::init_barrier()
{
    pthread_mutex_lock(&barrier_lock);

    barrier_count = 0;

    pthread_mutex_unlock(&barrier_lock);
}

void ThreadPool::barrier( int ns )
{
    pthread_mutex_lock(&barrier_lock);

    barrier_count++;

    if(barrier_count==ns)
    {
        for( int i=0 ; i<ns ; i++ )
        {
            pthread_cond_signal(&barrier_reached);
        }
    }else
    {
        while( barrier_count<ns )
        {
            pthread_cond_wait(&barrier_reached,&barrier_lock);
        }
    }

    pthread_mutex_unlock(&barrier_lock);
}

void ThreadPool::wait()
{
      pthread_mutex_lock(&q_lock);

      while(!(tasks.empty()))
      {
      pthread_cond_wait(&q_empty,&q_lock);
      }

      pthread_mutex_unlock(&q_lock);
}


void * ThreadPool::execute_task( void * arg )
{
    ThreadPool * thread_pool = (ThreadPool*) arg;
    task_t * cur_task;

    while(true)
    {
    pthread_mutex_lock(&(thread_pool->q_lock));

    while((thread_pool->tasks).empty())
    {
          if(thread_pool->shutdown)
          {
          pthread_mutex_unlock(&(thread_pool->q_lock));
          pthread_exit(NULL);
          }

          //cout << "I'm going to sleep!!!" << endl;

          pthread_cond_wait(&(thread_pool->q_not_empty),&(thread_pool->q_lock));

          //cout << "I've woken up!!!" << endl;

          if(thread_pool->shutdown)
          {
          pthread_mutex_unlock(&(thread_pool->q_lock));
          pthread_exit(NULL);
          }
    }

    cur_task = thread_pool->tasks.front();
    thread_pool->tasks.pop();

    if(thread_pool->tasks.empty() && !thread_pool->shutdown )
    {
          pthread_cond_signal(&(thread_pool->q_empty));
    }

    pthread_mutex_unlock(&(thread_pool->q_lock));

    //cout << "I'm executing a task!!!" << endl;

    (cur_task->routine)(cur_task->arg);

    if(cur_task->free_arg)
    {
           free(cur_task->arg);
    }

    free(cur_task);

    //cout << "I'm done with the task!!!" << endl;
    }
}

【问题讨论】:

  • 这听起来不太好,线程池永远不应该等待所有线程完成。这会阻止它完成它的工作。使用线程池的应用程序很可能对要完成的特定 tp 线程集感兴趣,它通过让从池中获取的线程发出完成信号来实现这一点。充其量您可能希望在应用程序终止时执行此操作。也不要那样做,死锁的几率很高。
  • @Hans Passant 我知道这一点。但是我正在开发的应用程序需要一个带有 wait() 操作的线程池。我的目标不是实现具有这种行为的线程池,而是在我的应用程序中使用这种操作。

标签: multithreading synchronization pthreads threadpool


【解决方案1】:

好吧,我通常做的是从线程池请求一个“TasksetWait”(TW)对象,通过“dispatch”方法发出任务,然后,为了同步通知,调用“AwaitCompletion()”方法。 TW 提供了一个已为请求线程锁定的私有互斥锁(确保它现在具有独占访问权限)、一个任务计数器 int、一个供请求者等待的“已完成”condvar/事件以及对其池的引用。 TW dispatch 将任务转发到池中,方法是在每个任务中加载一个自身的 ref,将任务推送到它的池中,并通过增加任务计数器来计数它们。

然后请求线程调用 TW->AwaitCompletion,它解锁互斥锁并等待事件。

同时,池线程正在执行任务 run() 方法。 run() 返回后,任务调用 TW 的 'OnCompletion() 方法,该方法锁定互斥体并减少计数。如果计数仍然非零,则解锁互斥锁并退出。如果计数为零,则解锁互斥锁,发出事件信号并退出。

当请求者再次运行时,它可以将 TW 返回到池中(可能会保留它们的缓存),或者直接销毁它。

一种变体是请求者向 TW 提供一个 'OnCompletion 方法,以便完成最后一个任务的池线程可以调用它,因此提供异步通知,(可能需要将消息发布到 GUI 输入队列)。

这样的机制允许线程池被多个请求者线程使用,或者(通过异步通知)请求者发出多个任务块,但如果请求者线程本身就是一个池线程,它可能会有点混乱在池中运行任务,(如果您想了解进程实际在做什么,最好避免这种情况:)。

【讨论】:

    猜你喜欢
    • 2015-08-29
    • 2010-10-16
    • 2015-07-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-18
    • 2016-06-02
    相关资源
    最近更新 更多