在需要频繁开线程时,创建和销毁线程会话费大量时间,为了提高效率,我们可以在任务开始前,先创建一定数量的线程。这样在接收到任务时,就可以直接使用线程池中处于wait状态的线程,在任务结束后线程回到wait状态,等待新任务的到来,这就避免了线程的创建与销毁,从而提高程序执行效率。
所需数据
- 需要存储有多少线程( int thread_number )
- 需要开辟对应的数组,存储线程号( pthread_t *threads )
- 需要一个任务队列来存储未执行的任务,便于线程竞争任务并执行( task_queue )
- 需要一个flag来标记线程池是否结束,该标记可以在线程池结束后唤醒所有处于等待线程的线程,让它们可以正常退出(其中,所有线程处于脱离(detach)状态)
- 互斥锁与条件变量,用于避免在获取与添加任务时发生错误(同步与互斥)
运行流程
- 执行ThreadPool的构造函数,初始化有关数据,进行线程的创建,并将线程进行脱离(使线程在运行完后可以自动回收)
- 创建的线程会去执行工作线程,如果任务队列为空,则一直while循环等待,直到被唤醒(signal)去竞争任务。若竞争到任务,则去执行,执行完后操作与前面相同;若没有竞争到任务,则回到wait状态
- 有新任务到来,接收到信号的程序会给线程池的任务队列添加信息,线程池便会唤醒处于wait的线程执行任务,或直接被刚执行完任务的线程继续执行(由于是任务队列,所以只有处于队首的任务会被执行)
代码实现(c++,后面会有对代码的分析)
源代码请看我的Github
封装线程池的类
// threadPool.h
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include "locker.h" // 该头文件见文章最后 或 Github
#include <queue>
using namespace std;
template< typename T >
class ThreadPool {
private:
int thread_number; // 线程池的线程数
pthread_t *threads; // 线程数组
queue<T *> task_queue; // 任务队列
MutexLocker queue_mutex_locker; // 任务队列的互斥锁
Cond queue_cond_locker; // 任务队列的条件变量
bool m_stop; // 是否结束线程
public:
ThreadPool( int thread_num = 20 );
~ThreadPool();
bool append( T *task ); // 向任务队列添加任务
private:
static void *worker( void * ); // 工作线程
void run(); // 线程池中线程开始运行的函数
T *getTask(); // 从任务队列中获取队首的任务
};
template< typename T >
ThreadPool<T>::ThreadPool( int thread_num ) :thread_number(thread_num),
threads(NULL), m_stop(false) {
if( thread_number < 0 ) {
cout << "thread_number < 0\n";
throw exception();
}
// 创建数组存放线程号
threads = new pthread_t[ thread_number ];
if( !threads ) {
cout << "threads is NULL\n";
throw exception();
}
// 创建规定数量的线程
for( int i = 0; i < thread_number; i++ ) {
// 由于pthread_create第三个参数必须指向静态函数,要使用类成员函数和变量,只能通过:
// 1) 类的静态对象
// 2) 将类的对象作为参数传给静态函数
// 这里通过第二种方法实现
if( pthread_create( &threads[i], NULL, worker, this ) ) { // 成功返回0
delete[] threads; // 创建失败则释放所有已分配的空间
cout << "pthread_create error\n";
throw exception();
}
// 将线程进行脱离,线程运行完后自动回收,避免使用主线程进行join等待其结束
if( pthread_detach( threads[i] ) ) {
delete[] threads;
cout << "pthread_detach error\n";
throw exception();
}
}
}
// 析构函数中,将m_stop置true,此时将阻塞中的所有线程唤醒
// 由于 !m_stop 为false,线程会退出循环,线程结束被回收( 详见函数run() )
// 若不唤醒线程,则在程序退出后,线程非正常结束,同时会导致
template< typename T >
ThreadPool<T>::~ThreadPool() {
delete[] threads;
m_stop = true;
queue_cond_locker.broadcast();
}
/* 添加任务时需要先上锁,并判断队列是否为空 */
template< typename T >
bool ThreadPool<T>::append( T *task ) {
queue_mutex_locker.mutex_lock();
bool need_signal = task_queue.empty(); // 记录添加任务之前队列是否为空
task_queue.push( task );
queue_mutex_locker.mutex_unlock();
// 如果添加任务之前队列为空,即所有线程都在wait,所以需要唤醒某个线程
if( need_signal ) {
queue_cond_locker.signal();
}
return true;
}
// 线程函数,调用run()来使线程开始工作
template< typename T >
void * ThreadPool<T>::worker( void *arg ) {
ThreadPool *pool = ( ThreadPool * )arg;
pool->run();
return pool;
}
// 获取处于队首的任务,获取时需要加锁,避免发生错误
// 若队列为空,则返回NULL,该线程成为等待状态(详见函数run())
template< typename T >
T* ThreadPool<T>::getTask() {
T *task = NULL;
queue_mutex_locker.mutex_lock();
if( !task_queue.empty() ) {
task = task_queue.front();
task_queue.pop();
}
queue_mutex_locker.mutex_unlock();
return task;
}
template< typename T >
void ThreadPool<T>::run() {
while( !m_stop ) { // 当线程池没有结束时,线程循环获取任务进行执行
T *task = getTask();
if( !task ) {
queue_cond_locker.wait(); // 队列为空,线程开始等待
} else {
task->doit(); // 开始执行任务
delete task; //task指向的对象在WebServer中new出来,因此需要手动delete
}
}
}
#endif
代码分析
首先需要注意,threadpool的实现使用了模板类,这样就需要把类的定义与成员函数的实现放在一个文件里,因为成员函数不能单独编译。
关于m_stop的作用:可以发现m_stop的初值为false,这样线程就可以一直循环等待执行任务( 见ThreadPool::run()成员函数 ),但是m_stop的值改变的位置在析构函数中,线程池都退出了,改变m_stop的值还有什么用呢?其实,在将m_stop的值改变后,又调用了broadcast()来唤醒所有线程,这样所有被阻塞的线程就会开始执行,run()方法的循环条件是
while( !m_stop )
,这样其循环会被破坏,进而线程结束,被自动回收。// PS:然而只要主线程(暂且就这样叫了)退出( 不是通过pthread_exit()退出 ),那么所有线程就会被强制终止,同样也不会正常退出,所以线程池析构函数中唤醒所有线程并不会有什么作用,可能被唤醒的线程仍会在主线程结束前未执行完,仍为非正常退出。可以通过sleep()来环节,不过这样仍不会对正在执行任务的线程有很好的作用,因为任务执行时间未知。(可以考虑在析构函数中使用join()等待线程结束 )在任务队列进行添加任务、取任务、删除任务时,为了避免多个线程对任务队列同时操作,需要在进行修改时将任务队列加锁
由于pthread_create第三个参数必须指向静态函数,要使用类成员函数和变量,只能通过:
1) 类的静态对象
2) 将类的对象作为参数传给静态函数
这里通过第二种方法实现,将对象本身作为参数传到线程工作函数,在线程函数中进行类型强转获取调用对象中存储的数据。在从任务队列获取任务时:如果任务队列非空,就加锁获得队首任务,再将队首出队,将任务返回即可;如果任务队列为空,即无法获取任务,就会返回NULL,那么run()函数对应就会进入else后的语句,将线程进行阻塞,等待下一个任务的到来。
关于在run() 中
delete task;
的说明:由于所有任务都是webServer类中使用new创建的(详见我的Github),因为如果不适用new的话,在将任务加入队列后,webServer.cpp中任务会出作用域,进行析构,那么任务可能会在执行完之前被析构,造成错误,所以使用new创建。然而,new创建的对象不会自动释放,只能手动delete,因此在任务结束后进行delete,这样才能使其资源释放,同事可以调用Task的析构函数,关闭该任务对应的连接。
附:头文件locker.h
封装互斥锁与条件变量的类
// locker.h
#ifndef _LOCKER_H_
#define _LOCKER_H_
#include <iostream>
#include <exception>
#include <pthread.h>
using namespace std;
/* 线程锁 */
class MutexLocker {
private:
pthread_mutex_t m_mutex;
public:
MutexLocker() { //初始化
if( pthread_mutex_init( &m_mutex, NULL ) ) {
cout << "mutex init errror __ 1\n";
throw exception();
}
}
~MutexLocker() {
pthread_mutex_destroy( &m_mutex );
}
bool mutex_lock() {
return pthread_mutex_lock( &m_mutex ) == 0;
}
bool mutex_unlock() {
return pthread_mutex_unlock( &m_mutex );
}
};
/* 条件变量 */
class Cond {
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
public:
Cond() {
if( pthread_mutex_init( &m_mutex, NULL ) ) {
throw exception();
}
if( pthread_cond_init( &m_cond, NULL ) ) {
pthread_cond_destroy( &m_cond );
throw exception();
}
}
~Cond() {
pthread_mutex_destroy( &m_mutex );
pthread_cond_destroy( &m_cond );
}
// 等待条件变量,cond与mutex搭配使用,避免造成共享数据的混乱
bool wait() {
pthread_mutex_lock( &m_mutex );
int ret = pthread_cond_wait( &m_cond, &m_mutex );
pthread_mutex_unlock( &m_mutex );
return ret == 0;
}
// 唤醒等待该条件变量的某个线程
bool signal() {
return pthread_cond_signal( &m_cond ) == 0;
}
// 唤醒所有等待该条件变量的线程
bool broadcast() {
return pthread_cond_broadcast( &m_cond ) == 0;
}
};
#endif