线性池,一般和网络通信联系到一起实现epoll多路转接,但这里为了强调线程池的生产和添加逻辑,不再结合服务器那一块实现
线程池总结构
typedef struct threadpool_t
{
int max_thr_num;//线程池最大线程数
int shutdown;//标志位,线程使用状态,0或1
////0 线程池不销毁 1 销毁
threadpool_task *queue_task; //任务队列
int queue_front; //队列头下标
int queue_rear; //队列尾下标
int queue_size; //队列中实际任务数
int queue_max_size; //队列可容纳任务数上限
pthread_t *threads; //线程池存放每个线程的tid数组
pthread_cond_t cond; //用于维护队列任务的条件变量
pthread_mutex_t lock; //用于锁住本结构体
}threadpool_t;
任务队列结构
typedef struct threadpool_task_t
{
void *(*func)(void *); //函数指针,回调函数
void *arg; //上面函数的参数
} threadpool_task; //各子线程任务结构体
线程池模块分析:
1.main():
创建线程池
向线程池添加任务,借助回调处理任务
销毁线程池
int main()
{
int i;
//sleep(1);
threadpool_t *pool=threadpool_create(10,10);
for(i=0;i<10;i++)
{
//int threadpool_add(threadpool_t *pool,void *(*func)(void* arg),void *arg)
threadpool_add(pool,func,NULL);
}
sleep(10);
threadpool_destroy(pool);
}
2.threadpool_create():
创建线程池结构体指针(threadpool_t *pool)
初始化线程池结构体(N个成员变量)
创建N个任务线程
pthread_create()最后一个参数传(void *)pool是为了给线程执行函
数threadpool_thread(void* threadpool)传当前初始化好了的结构体
threadpool_t *threadpool_create(int max_thr_num,int queue_max_size)
{
int i;
threadpool_t *pool=(threadpool_t *)malloc(sizeof(threadpool_t));
pool->shutdown=0;
pool->max_thr_num=max_thr_num;
pool->threads=(pthread_t*)malloc(sizeof(pthread_t)*max_thr_num);
pool->queue_task=(threadpool_task*)malloc(sizeof(threadpool_task)*queue_max_size);
pool->queue_size=0;
pool->queue_max_size=queue_max_size;
pool->queue_front=0;
pool->queue_rear=0;
pthread_mutex_init(&(pool->lock),NULL);
pthread_cond_init(&(pool->cond),NULL);
for(i=0;i<max_thr_num;i++)
{
pthread_create(&pool->threads[i],NULL,threadpool_thread,(void*)pool);
//pool指向当前线程池
}
return pool;
}
3.threadpool_thread():线程执行函数
进入子线程回调函数
接受参数void* arg ---> pool结构体
加锁--->lock--->整个结构体锁
判断条件变量---->wait
刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后
再唤醒接收任务
当queue_size==0 或者 pool->shutdown==0说明没有任务,调 wait
阻塞在条件变量上, 若有任务,跳过该while
wait之后
唤醒之后判断是否要销毁线程池
如果不就加锁
获取任务处理回调函数及其参数
利用环形队列机制,实现处理任务,借助头指针挪移%实现
解锁
执行回调函数任务
原型:void *(*func)(void* arg)
task.func(task.arg)
void *threadpool_thread(void* threadpool)
{
threadpool_t *pool=(threadpool_t *)threadpool;
threadpool_task task;
while(1)
{
pthread_mutex_lock(&pool->lock);
while((pool->queue_size==0)&&(!pool->shutdown))
{
// printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
//printf("thread %ld is ready \n",pthread_self());
pthread_cond_wait(&pool->cond,&pool->lock);
//queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while
//printf("thread %ld is weak up \n",pthread_self());
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->lock);
pthread_exit(0);
}
else
{
task.func=pool->queue_task[pool->queue_front].func;
task.arg=pool->queue_task[pool->queue_front].arg;
pool->queue_front=(pool->queue_front+1)%pool->queue_max_size;
pool->queue_size--;
pthread_mutex_unlock(&pool->lock);
task.func(task.arg);
//((*task.func)(task.arg));
}
}
//pthread_exit(NULL);
}
4.threadpool_add():添加任务队列函数
总功能:
模拟产生任务,
设置回调函数,处理任务,sleep(3)代表处理完成
内部实现:
加锁
初始化任务队列结构体成员 回调函数func,arg
利用环形队列机制,实现添加任务,借助队尾指针挪移%实现
唤醒阻塞在条件变量上的线程
解锁
int threadpool_add(threadpool_t *pool,void *(*func)(void* arg),void *arg)
{
pthread_mutex_lock(&pool->lock);
//清空工作线程调用的回调函数的参数arg
if(pool->queue_task[pool->queue_rear].arg!=NULL)
{
free(pool->queue_task[pool->queue_rear].arg);
pool->queue_task[pool->queue_rear].arg = NULL;
}
pool->queue_task[pool->queue_rear].func=func;
pool->queue_task[pool->queue_rear].arg=arg;
pool->queue_rear=(pool->queue_rear+1)%pool->queue_max_size;
pool->queue_size++;//向任务队列中添加一个任务
pthread_cond_signal(&pool->cond);
//添加完任务后,队列不为空,唤醒线程池中等待处理任务的线程
pthread_mutex_unlock(&pool->lock);
}
5.func(): 线程池中的线程,模拟处理业务
打印自身线程号
让别的线程有机会抢锁
void *func()
{
printf("pthread id = %ld\n",pthread_self());
sleep(3);
}
6.threadpool_destroy()
void threadpool_destroy(threadpool_t* pool)
{
int i;
printf("开始销毁\n");
if(pool->shutdown) //先判断一下销毁标志是否已经销毁
{
return;
}
pool->shutdown=1;
pthread_mutex_lock(&pool->lock);
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->lock);
for(i=0;i<pool->max_thr_num;i++)
{
pthread_join(pool->threads[i],NULL);
}
free(pool->threads);
/*if(pool->queue_task)
{
free(pool->queue_task);
}
if (pool->threads)
{
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->cond));
}
free(pool);
pool = NULL;*/
pthread_mutex_lock(&pool->lock);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
free(pool);
printf("销毁完成\n");
}
下面是源码
#include<stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <stdbool.h>
#include <signal.h>
#include <assert.h>
#include <errno.h>
typedef struct threadpool_task_t
{
void *(*func)(void* arg);
void* arg;
}threadpool_task;
typedef struct threadpool_t
{
int max_thr_num;
int shutdown;
threadpool_task *queue_task;
int queue_front;
int queue_rear;
int queue_size;
int queue_max_size;
pthread_t *threads;
pthread_cond_t cond;
pthread_mutex_t lock;
}threadpool_t;
void *threadpool_thread(void* threadpool);
threadpool_t *threadpool_create(int max_thr_num,int queue_max_size);
int threadpool_add(threadpool_t *pool,void *(*func)(void* arg),void *arg);
void threadpool_destroy(threadpool_t* pool);
void *func();
void *threadpool_thread(void* threadpool)
{
threadpool_t *pool=(threadpool_t *)threadpool;
threadpool_task task;
while(1)
{
pthread_mutex_lock(&pool->lock);
while((pool->queue_size==0)&&(!pool->shutdown))
{
// printf("thread 0x%x is waiting\n", (unsigned int)pthread_self());
//printf("thread %ld is ready \n",pthread_self());
pthread_cond_wait(&pool->cond,&pool->lock);
//printf("thread %ld is weak up \n",pthread_self());
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->lock);
pthread_exit(0);
}
else
{
task.func=pool->queue_task[pool->queue_front].func;
task.arg=pool->queue_task[pool->queue_front].arg;
pool->queue_front=(pool->queue_front+1)%pool->queue_max_size;
pool->queue_size--;
pthread_mutex_unlock(&pool->lock);
task.func(task.arg);
//((*task.func)(task.arg));
}
}
//pthread_exit(NULL);
}
threadpool_t *threadpool_create(int max_thr_num,int queue_max_size)
{
int i;
threadpool_t *pool=(threadpool_t *)malloc(sizeof(threadpool_t));
pool->shutdown=0;
pool->max_thr_num=max_thr_num;
pool->threads=(pthread_t*)malloc(sizeof(pthread_t)*max_thr_num);
pool->queue_task=(threadpool_task*)malloc(sizeof(threadpool_task)*queue_max_size);
pool->queue_size=0;
pool->queue_max_size=queue_max_size;
pool->queue_front=0;
pool->queue_rear=0;
pthread_mutex_init(&(pool->lock),NULL);
pthread_cond_init(&(pool->cond),NULL);
for(i=0;i<max_thr_num;i++)
{
pthread_create(&pool->threads[i],NULL,threadpool_thread,(void*)pool);
}
return pool;
}
int threadpool_add(threadpool_t *pool,void *(*func)(void* arg),void *arg)
{
pthread_mutex_lock(&pool->lock);
if(pool->queue_task[pool->queue_rear].arg!=NULL)
{
free(pool->queue_task[pool->queue_rear].arg);
pool->queue_task[pool->queue_rear].arg = NULL;
}
pool->queue_task[pool->queue_rear].func=func;
pool->queue_task[pool->queue_rear].arg=arg;
pool->queue_rear=(pool->queue_rear+1)%pool->queue_max_size;
pool->queue_size++;//向任务队列中添加一个任务
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->lock);
}
void threadpool_destroy(threadpool_t* pool)
{
int i;
printf("开始销毁\n");
if(pool->shutdown) //先判断一下销毁标志是否已经销毁
{
return;
}
pool->shutdown=1;
pthread_mutex_lock(&pool->lock);
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->lock);
for(i=0;i<pool->max_thr_num;i++)
{
pthread_join(pool->threads[i],NULL);
}
free(pool->threads);
/*if(pool->queue_task)
{
free(pool->queue_task);
}
if (pool->threads)
{
free(pool->threads);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->cond));
}
free(pool);
pool = NULL;*/
pthread_mutex_lock(&pool->lock);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
free(pool);
printf("销毁完成\n");
}
void *func()
{
printf("pthread id = %ld\n",pthread_self());
sleep(3);
}
int main()
{
int i;
sleep(1);
threadpool_t *pool=threadpool_create(10,10);
for(i=0;i<10;i++)
{
threadpool_add(pool,func,NULL);
}
sleep(10);
threadpool_destroy(pool);
}
还有一些扩容和销毁操作,这算是个比较简化的版本,可能在以后了解更深入再过来补充