【发布时间】:2013-12-02 19:31:52
【问题描述】:
我已经实现了一个线程池。现在它的基本操作如下:
void 初始化(bool detached_threads);
bool dispatch( void *(*dispatch_fn)(void*) , void * arg , bool free_arg );
void shutdown();
静态 void * execute_task( void * arg );
现在我想添加操作wait(),它将由主线程调用,并等待线程池中的所有线程完成它们正在执行的任务。我不想使用 pthread_join 因为这会杀死所有线程并且我不想再次创建线程池。我已经按照下面提供的代码实现了等待操作,但似乎不正确。
请给我建议什么是错的。谢谢!!!
#include "../inc/ThreadPool.hpp"
#include <cstdio>
#include <cstdlib>
#include <iostream>
using namespace std;
ThreadPool::ThreadPool( unsigned int n )
:num_threads(n)
{
if(num_threads<=0)
{
num_threads = DEFAULT_THREAD_POOL_SIZE;
}
barrier_count = 0;
threads = (pthread_t*) malloc(sizeof(pthread_t)*num_threads);
shutdown = false;
dont_accept = false;
pthread_mutex_init(&barrier_lock,NULL);
pthread_cond_init(&barrier_reached,NULL);
pthread_mutex_init(&q_lock,NULL);
pthread_cond_init(&q_not_empty,NULL);
pthread_cond_init(&q_empty,NULL);
}
ThreadPool::~ThreadPool()
{
//cout << "~ThreadPool()" << endl;
}
void ThreadPool::initialise( bool detached_threads )
{
//pthread_attr_t attr;
//if(detached_threads)
//{
//pthread_attr_init(&attr);
//pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//}
for ( int i = 0 ; i<num_threads ; i++ )
{
pthread_create( &threads[i] , NULL , execute_task , this );
}
}
bool ThreadPool::dispatch( void *(*routine)(void*) , void * arg , bool free_arg )
{
task_t * new_task = (task_t*) malloc(sizeof(task_t));
new_task->routine = routine;
new_task->arg = arg;
new_task->free_arg = free_arg;
pthread_mutex_lock(&q_lock);
if(dont_accept)
{
free(new_task);
return false;
}
bool was_empty = tasks.empty();
tasks.push(new_task);
if(was_empty)
{
pthread_cond_signal(&q_not_empty);
}
pthread_mutex_unlock(&q_lock);
return true;
}
void ThreadPool::shut_down()
{
void * return_val;
pthread_mutex_lock(&q_lock);
dont_accept = true;
while(!(tasks.empty()))
{
pthread_cond_wait(&q_empty,&q_lock);
}
shutdown = true;
pthread_cond_broadcast(&q_not_empty);
pthread_mutex_unlock(&q_lock);
for(int i=0 ; i<num_threads ; i++)
{
//pthread_join(threads[i],NULL);
pthread_join(threads[i],&return_val);
}
free(threads);
pthread_mutex_destroy(&barrier_lock);
pthread_cond_destroy(&barrier_reached);
pthread_mutex_destroy(&q_lock);
pthread_cond_destroy(&q_empty);
pthread_cond_destroy(&q_not_empty);
}
void ThreadPool::init_barrier()
{
pthread_mutex_lock(&barrier_lock);
barrier_count = 0;
pthread_mutex_unlock(&barrier_lock);
}
void ThreadPool::barrier( int ns )
{
pthread_mutex_lock(&barrier_lock);
barrier_count++;
if(barrier_count==ns)
{
for( int i=0 ; i<ns ; i++ )
{
pthread_cond_signal(&barrier_reached);
}
}else
{
while( barrier_count<ns )
{
pthread_cond_wait(&barrier_reached,&barrier_lock);
}
}
pthread_mutex_unlock(&barrier_lock);
}
void ThreadPool::wait()
{
pthread_mutex_lock(&q_lock);
while(!(tasks.empty()))
{
pthread_cond_wait(&q_empty,&q_lock);
}
pthread_mutex_unlock(&q_lock);
}
void * ThreadPool::execute_task( void * arg )
{
ThreadPool * thread_pool = (ThreadPool*) arg;
task_t * cur_task;
while(true)
{
pthread_mutex_lock(&(thread_pool->q_lock));
while((thread_pool->tasks).empty())
{
if(thread_pool->shutdown)
{
pthread_mutex_unlock(&(thread_pool->q_lock));
pthread_exit(NULL);
}
//cout << "I'm going to sleep!!!" << endl;
pthread_cond_wait(&(thread_pool->q_not_empty),&(thread_pool->q_lock));
//cout << "I've woken up!!!" << endl;
if(thread_pool->shutdown)
{
pthread_mutex_unlock(&(thread_pool->q_lock));
pthread_exit(NULL);
}
}
cur_task = thread_pool->tasks.front();
thread_pool->tasks.pop();
if(thread_pool->tasks.empty() && !thread_pool->shutdown )
{
pthread_cond_signal(&(thread_pool->q_empty));
}
pthread_mutex_unlock(&(thread_pool->q_lock));
//cout << "I'm executing a task!!!" << endl;
(cur_task->routine)(cur_task->arg);
if(cur_task->free_arg)
{
free(cur_task->arg);
}
free(cur_task);
//cout << "I'm done with the task!!!" << endl;
}
}
【问题讨论】:
-
这听起来不太好,线程池永远不应该等待所有线程完成。这会阻止它完成它的工作。使用线程池的应用程序很可能对要完成的特定 tp 线程集感兴趣,它通过让从池中获取的线程发出完成信号来实现这一点。充其量您可能希望在应用程序终止时执行此操作。也不要那样做,死锁的几率很高。
-
@Hans Passant 我知道这一点。但是我正在开发的应用程序需要一个带有 wait() 操作的线程池。我的目标不是实现具有这种行为的线程池,而是在我的应用程序中使用这种操作。
标签: multithreading synchronization pthreads threadpool