1 线程池是什么?

在多任务并发执行的时候往往需要开启很多线程来执行。而一个线程的创建和销毁是需要消耗一部分计算机资源的,而如果一个线程执行任务的资源消耗和创建一个线程的消耗差不多的情况下,那简直太浪费资源了。所以如果有已经创建好的一堆线程等着执行任务,有任务来了,调用一个线程去执行就可以了,不用重新创建一个线程。这样可以省去很多资源消耗。

而线程池就是创建了若干个等待执行任务的线程的容器。线程池就是一个线程的容器,线程池负责容纳,管理,甚至调度线程的执行和任务的分配(其中线程的调度和任务的分配不一定是由线程池来完成这个根据实现的不同有不同的责任分配)。

线程池的基本运行过程是管理一个任务队列,一个线程队列,然后每次去一个任务分配给一个线程去做,一直这样循环。

2 线程池实现原理

前面说过要在线程池中线程是复用的,而任务是不断更换的。但是这一点在语言层面是不支持的,因为一般的thread都是执行一个固定的task函数,当执行完毕后该线程就结束了,然后销毁。所以如何实现task和thread的分配是一个关键的问题。

这里一般有两种解决办法
1、一种是让每一个线程都执行任务调度函数,循环获取一个task,然后执行。
2、每一个形成处于等待任务状态,另有一个主线程来进行任务调度。

其中第一种方法比较简单,实现起来也非常容易,但是该方法有个缺点就是如果线程比较多的情况下会存在对任务队列访问的竞争,从而降低效率。所以这里以第二种方式实现线程池。

大概规划如下:
1、需要对线程进行封装,做好线程自己的同步。
2、需要一个线程容器,比如队列或者列表之类
3、任务队列,如果有优先级要求的化可以加入优先级评价体系。任务队列是一个典型的生产者和消费者模型。
4、需要一个任务调度线程
5、每个工作线程绑定一个任务,如果没有任务的情况下线程处于阻塞状态。当接受到一个任务后线程唤醒然后执行任务。

3 线程池实现

需要声明的一点是该线程池的实现使用了大量的C++11中的内容,编译器用的是vs2017(对C++11支持比较友好)

其中用到的C++11中的内容有:

1、thread
2、mutex
3、condition_variable
4、atomic
5、unique_lock

实现的线程池UML图如下图所示:

C++11实现线程池

3.1 Task类
// 类的定义
class Task
{
public:
	
	Task() {};
	~Task() {};
	virtual void run() = 0;
};

在创建任务的时候需要继承该类,然后重写run()函数。

3.2 WorkThread类
// 类的定义
class WorkThread
{
	std::thread m_thread;		// 工作线程
	Task * m_task;				// 任务指针
	std::mutex m_mutexThread;	// 关于任务是否执行的互斥量
	std::mutex m_mutexCondition;// 条件互斥量
	std::mutex m_mutexTask;		// 关于分配任务的互斥量
	std::condition_variable m_condition;	// 任务条件变量
	std::atomic<bool> m_bRunning;			// 是否运行的标志位
	bool m_bStop;
protected:
	virtual void run();				// 线程函数
public:
	WorkThread();
	~WorkThread();

	void assign(Task *task);		// 分配任务
	std::thread::id getThreadID();  // 获取线程ID
	void stop();					// 停止线程运行,其实就是结束线程运行			
	void notify();					// 通知阻塞线程
	void notify_all();				// 通知所有的阻塞线程
	bool isExecuting();				// 是否在执行任务
};

m_thread:是该类管理的一个线程。
m_task:是关联的一个任务。
run():是被管理的一个线程的线程函数
assign():给线程分配任务
getThreadID();获取线程ID
stop():该线程暂停接受任务
notify():通知阻塞(因等待任务而阻塞)的线程执行任务
notify_all():通知所有阻塞的线程,其实该函数没用,因为该类中只管理一个线程
isExecuting():判断该线程是否正在执行任务

其中run函数定义如下:

void WorkThread::run()
{
	while (true)
	{
		if (!m_bRunning.load())
		{
			m_mutexTask.lock();
			if (nullptr == m_task)
			{
				m_mutexTask.unlock();
				break;
			}
			m_mutexTask.unlock();
		}
		Task * task = nullptr;
		// 等待任务,如果没有任务并且线程也没有退出的话线程会阻塞
		
		{
			std::unique_lock<std::mutex> lock(m_mutexTask);
			// 等待信号
			m_condition.wait(lock, 
				[this]() {
				return !((nullptr == m_task) && this->m_bRunning.load()); 
			});
			task = m_task;
			m_task = nullptr;
		}
		if (nullptr == task)
		{
			continue;
		}
		task->run();// 执行任务
		delete task;// 释放task内存
		task = nullptr;
	}
}
3.3 LeisureThreadList类
// 类定义如下
class LeisureThreadList
{
	std::list<WorkThread *> m_threadList;	// 线程列表
	std::mutex m_mutexThread;				// 线程列表访问互斥量
	void assign(const size_t counts);		// 创建线程
public:
	LeisureThreadList(const size_t counts);
	~LeisureThreadList();					
	void push(WorkThread * thread);			// 添加线程
	WorkThread * top();						// 返回第一个线程指针
	void pop();								// 删除第一个线程
	size_t size();							// 返回线程个数
	void stop();							// 停止运行
};
3.4 ThreadPool类
// 类的定义
class ThreadPool
{
	std::thread m_thread;	// 线程池的任务分配线程
	LeisureThreadList m_leisureThreadList;				// 线程列表
	std::queue<Task *, std::list<Task *>> m_taskList;	// 任务队列
	std::atomic<bool> m_bRunning;				// 是否运行
	std::atomic<bool> m_bEnd;					// 是否结束运行
	std::atomic<size_t> m_threadCounts;			// 线程总数
	std::condition_variable m_condition_task;	// 任务条件
	std::condition_variable m_condition_thread;	// 线程列表条件
	std::condition_variable m_condition_running;// 运行条件变量
	std::mutex m_runningMutex;					// 运行控制变量
	std::mutex m_mutexThread;					// 空闲互斥量
	std::mutex m_taskMutex;						// 访问任务列表互斥量
	bool m_bExit;								// 是否退出标志位

	void run();					// 线程池主线程函数
public:
	ThreadPool(const size_t counts);
	~ThreadPool();
	size_t threadCounts();
	bool isRunning();			// 线程是否正在运行任务
	void addTask(Task *task);	// 添加任务
	// 线程池开始调度任务,线程池创建后不用调用该函数。该函数需要和stop()
	// 配合使用。
	void start();				// 线程池开始执行
	// 线程池暂停任务调度,但是不影响任务添加。想要开始任务调度,则调用
	// start()函数
	void stop();				// 线程池暂停运行
	// 该函数会在线程池中的所有任务都分派出去后结束线程池的线程运行,
	// 同样的线程列表中的线程会在自己的任务执行完后,然后再退出。
	void exit();				// 线程池退出
};

m_thread:线程池的任务分配线程。
m_leisureThreadList:线程列表。
m_taskList:任务队列。

run():线程池工作线程的线程函数,该函数主要进行任务调度。
threadCounts:返回共有多少个线程。
isRunning():是否正在运行。
addTask():添加任务。
start():线程池开始分配任务,通常在使用stop()后,想要继续分派执行任务,则调用该函数。
stop():是现线程池暂停任务分派,但是仍然可以添加任务。如果需要继续分派任务的话则调用start()。
exit():使线程池退出,在线程池使用完毕,想要结束的时候需要调用该函数,该函数可以保证线程池中的所有任务被执行完毕,所有线程能够执行完毕。

其中run()函数的定义如下:

void ThreadPool::run()
{
	while (true)
	{
		if (!m_bEnd.load())
		{
			m_taskMutex.lock();
			if (m_taskList.empty())
			{
				m_taskMutex.unlock();
				break;
			}
			m_taskMutex.unlock();
		}
		// 暂停执行,则阻塞
		
		{
			std::unique_lock<std::mutex> lockRunning(m_runningMutex);
			m_condition_running.wait(lockRunning,
				[this]() {return this->m_bRunning.load(); });
		}
		WorkThread *thread = nullptr;
		Task * task = nullptr;
		
		{
			std::unique_lock<std::mutex> lock(m_taskMutex);
			// 如果没有任务且没有结束则阻塞
			m_condition_task.wait(lock,
				[this]() {
				return !(this->m_taskList.empty() && this->m_bEnd.load());
			});
			task = m_taskList.front();
			m_taskList.pop();
		}
		// 选择空闲的线程执行任务
		do {
			thread = m_leisureThreadList.top();
			m_leisureThreadList.pop();
			m_leisureThreadList.push(thread);
		} while (thread->isExecuting());
		// 通知线程执行
		thread->assign(task);
		thread->notify();
	}
}
3.5 test

测试代码如下:

#include "threadpool.h"
#include <iostream>
#include <ctime>
#include <fstream>
#include <chrono>

std::mutex mtx;

class myTask : public Task
{
	int m_data;
public:
	myTask(int data)
	{
		m_data = data;
	}
	void run()
	{
		mtx.lock();
		std::cout << "线程:"<< std::this_thread::get_id() << "输出:" << m_data << std::endl;
		mtx.unlock();
	}
};

int main()
{
	std::ofstream f("D:\\temp\\threadPool.txt");
	ThreadPool pool(5);
	for (int i = 0; i < 10000; i++)
	{
		pool.addTask(new myTask(i));
	}
	pool.exit();
	f.close();
	getchar();
	return 0;
}

运行结果如下:

C++11实现线程池

完整源码下载

分类:

技术点:

相关文章: