线程池
意义
在实际应用场景中线程的多次创建销毁会产生不必要的资源消耗。而线程池可以实现线程的多次复用,避免了重复的创建与销毁
思想
线程池由两部分组成,一部分是存放多个线程的线程池,另一部分是存放任务的队列。
线程池中应该有线程与队列最基本的信息
如线程池中能存放线程的最大量与最小量,池中线程的线程号
队列的容量,头节点,尾节点,当前队列中任务数量等等
当有空闲线程时,若任务队列不为空,该线程则会将任务取出执行,若队列为空则阻塞等待
当任务队列为满时,若有空闲线程,则该线程则会将任务取出执行,若无空闲线程则阻塞等待
因此需要空和满两个条件变量
线程池结构体作为全局变量被多个线程共享,为了防止混乱的发生,需要在访问全局变量之前先加锁
为了更好的管理线程池中资源的分配,当空闲线程数量过多任务过少时,需要适当的回收一部分线程。当任务数量过多空闲任务不足时需要创建一些线程。
typedef struct Pthreadpool//线程结构体
{
task *taskQ;
pthread_t *tidwork;//工作线程号
pthread_t tidmanager;//管理者线程号
pthread_cond_t full;//任务队列空
pthread_cond_t empty;//任务队列
pthread_mutex_t mutexpool;//线程池互斥锁
pthread_mutex_t mutexbusy;//忙线程互斥锁
int queueFornt;//头
int queueRear;//尾
int queuesize;//当前任务队列中的任务数
int queueCapacity;//容量
int minNum;//最小线程数
int maxNum;//最大线程数
int busyNum;//忙线程
int liveNum;//存活线程数
int exitNum;//要杀死的线程个数
int flag;//是否销毁线程池
}Pthreadpool;
typedef struct task
{
void(*function)(void *arg);
void *arg;
}task;
线程池的创建
根据结构体中的内容对其一一初始化处理
Pthreadpool * create_pool(int num,int max,int min)
{
Pthreadpool *pool=(Pthreadpool*)malloc(sizeof(Pthreadpool));
if(pool==NULL){
printf("创建线程池失败\n");
return 0;
}
pool->tidwork=(pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->tidwork==NULL){
free(pool);
printf("线程号数组创建失败\n");
return 0;
}
memset(pool->tidwork,0,sizeof(pthread_t)*max);
pool->queueCapacity=num;
pool->maxNum=max;
pool->minNum=min;
pool->busyNum=0;
pool->liveNum=min;
pool->exitNum=0;
pool->taskQ=malloc(sizeof(task)*num);
if(pool->taskQ==NULL){
free(pool->tidwork);
free(pool);
printf("线程号数组创建失败\n");
return 0;
}
pool->queueCapacity=num;
pool->queueFornt=0;
pool->queueRear=0;
pool->queuesize=0;
pthread_create(&pool->tidmanager,NULL,manarge,pool);
if (pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexbusy, NULL) != 0 ||
pthread_cond_init(&pool->empty, NULL) != 0 ||
pthread_cond_init(&pool->full, NULL) != 0)
{
free(pool->tidwork);
free(pool);
printf("mutex or condition init fail...\n");
}
int i;
pool->flag=0;
for(i=0;i<min;i++){
pthread_create(&pool->tidwork[i],NULL,worker,pool);
pthread_detach(pool->tidwork[i]);//释放线程,待其结束后系统自动回收
}
return pool;
}
添加任务
void task_push(Pthreadpool* pool,void (*function)(void *arg), void *arg)
{
pthread_mutex_lock(&pool->mutexpool);
while(pool->queuesize==pool->queueCapacity&&!pool->flag){
//当任务队列为满,且未被标记为销毁时阻塞等待
pthread_cond_wait(&pool->full,&pool->mutexpool);
}
if(pool->flag==1){
//若被标记为消毁直接解锁退出
pthread_mutex_unlock(&pool->mutexpool);
return;
}
pool->taskQ[pool->queueRear].function=function;//添加任务函数与参数
pool->taskQ[pool->queueRear].arg=arg;
pool->queueRear=(pool->queueRear+1)%pool->queueCapacity;
pool->queuesize++;
pthread_cond_signal(&pool->empty);//此时任务队列中已有任务唤醒工作线程
pthread_mutex_unlock(&pool->mutexpool);
}
执行任务
void *worker (void* arg)
{
Pthreadpool*pool=(Pthreadpool*)arg;
while(1){
pthread_mutex_lock(&pool->mutexpool);
// 当前任务队列是否为空
while (pool->queuesize == 0 && !pool->flag)
{
// 阻塞工作线程
pthread_cond_wait(&pool->empty, &pool->mutexpool);
// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
//pool->exitNum--;不能将该语句写在此处,若管理者线程连续唤醒多次该线程但条件一直未成立,exitNum仍然大于0。在这种情况下当有生产者线程将其唤醒时,就会在此处退出。
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexpool);
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->flag)
{
pthread_mutex_unlock(&pool->mutexpool);
threadExit(pool);
}
task t;
t.function=pool->taskQ[pool->queueFornt].function;
t.arg=pool->taskQ[pool->queueFornt].arg;
pool->queueFornt=(pool->queueFornt+1)%pool->queueCapacity;
pool->queuesize--;
pthread_cond_signal(&pool->full);//通知任务生产函数生产任务
pthread_mutex_unlock(&pool->mutexpool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexbusy);//忙线程+1
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexbusy);
t.function(t.arg);//执行任务
t.arg==NULL;
pthread_mutex_lock(&pool->mutexbusy);//任务完成后忙线程-1
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexbusy);
}
}
管理者函数
void *manarge (void* arg)
{
Pthreadpool *pool=(Pthreadpool *)arg;
while (!pool->flag)
{
sleep(3);//每隔一段时间查询一次线程池中线程的情况
pthread_mutex_lock(&pool->mutexpool);
int queuesize=pool->queuesize;//得到当前队列中任务的数量
int liveNum=pool->liveNum;//得到当前存活线程数量
pthread_mutex_unlock(&pool->mutexpool);
pthread_mutex_lock(&pool->mutexbusy);
int busyNum=pool->busyNum;//得到当前忙线程数量
pthread_mutex_unlock(&pool->mutexbusy);
//当任务多于存活线程数量时创建线程
if(queuesize>liveNum&&liveNum<pool->maxNum){
pthread_mutex_lock(&pool->mutexpool);
int count=0,i;
for(i=0; i < pool->maxNum && count < NUMBER && pool->liveNum < pool->maxNum; ++i){
if(pool->tidwork[i]==0){
pthread_create(&pool->tidwork[i],NULL,worker,pool);
count++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexpool);
}
//当空闲线程过多时唤醒空闲线程使其退出
if(busyNum*2<liveNum&&liveNum>pool->minNum){
pthread_mutex_lock(&pool->mutexpool);
pool->exitNum=NUMBER;
pthread_mutex_unlock(&pool->mutexpool);
for(int i = 0; i < NUMBER; i++){
pthread_cond_signal(&pool->empty);
}
}
}
}
线程退出函数
void *threadExit(Pthreadpool *pool)
{
pthread_t tid=pthread_self();//获取当前线程的线程号
int i;
for(i=0;i<pool->maxNum;i++){
if(pool->tidwork[i] == tid){
//遍历工作线程号数组找到要销毁的线程
pool->tidwork[i]=0;//将其在线程号数组中的值还原为0
printf("threadExit() called, %ld exiting\n", tid);
break;
}
}
pthread_exit(NULL);
}
线程销毁
void destroy_pool(Pthreadpool * pool)
{
int i;
if(pool==NULL){
return ;
}
pool->flag=1;
pthread_join(pool->tidmanager,NULL);
for(i=0;i<pool->liveNum;i++){
pthread_cond_signal(&pool->empty);
}
if(pool->taskQ){
free(pool->taskQ);
}
if(pool->tidwork){
free(pool->tidwork);
}
pthread_mutex_destroy(&pool->mutexpool);
pthread_mutex_destroy(&pool->mutexbusy);
pthread_cond_destroy(&pool->empty);
pthread_cond_destroy(&pool->full);
free(pool);
pool = NULL;
return ;
}