线程池,在我看来就是一个 线程链表,接着我们通过任务链表(其实应该是队列)在上面分配一些任务函数。
在实际应用中,线程池可以和网络服务器等高并发的程序结合起来实现异步操作。
线程池核心结构
typedef struct my_pthread_t
{
pthread_t *_pthread; //线程号
bool state; //是否已经进入等待状态
}my_pthread_t;
typedef struct trdpool_t{
my_pthread_t *phead; /*其实这里可以直接用下面的pthread_t*取代(主流做法),但是因为在学习的过程中,遇见了点小问题,这里做了修改,用途后见*/
// pthread_t *phead;
work_t *whead; //任务链表的头
pthread_cond_t cond; //用于维护任务链表的条件变量
pthread_mutex_t mutex; //用于锁定条件变量的互斥锁
bool status; //线程池状态: 0 线程池不销毁 1 销毁
int num; //线程池中的线程数量
}trdpool_t;
任务链表结构
typedef void Fun(void*);
typedef struct work_t{
Fun* fun; //任务函数
void*arg; //任务函数的参数
struct work_t*next; //任务链表的下一个任务的指针
}work_t;
main 流程
int main(int argc,char**argv)
{
prdpool_init(5); //在线程池创建5个线程
for(int i=0;i<10;i++)
add_work(); //添加10个任务
sleep(10); //这里是为了看效果,延缓下销毁
// while(1);
prdpool_destory(); //销毁线程池
return 0;
}
prdpool_init(int)
void prdpool_init(int num)
{
trdpool=malloc(sizeof(trdpool_t)); //动态内存记得free
pthread_mutex_init(& trdpool->mutex,NULL); //初始化锁
pthread_cond_init(& trdpool->cond,NULL); //初始化条件变量
trdpool->status=0; //初始化状态为未销毁
trdpool->phead=malloc(sizeof(my_pthread_t)*num); //要求分配num个线程
trdpool->num=num;
for(int i=0;i<num;i++){
??? trdpool->phead[i]._pthread=malloc(sizeof(pthread_t)); //动态分配线程的内存空间
trdpool->phead[i].state=0; //这个是代表线程尚未进入休眠状态,并不必要
bool*pthread_arg=&(trdpool->phead[i].state); //这个是传给线程的参数
pthread_create(trdpool->phead[i]._pthread,NULL,start_routine,(void*)pthread_arg);
//创建线程
lblready:{
if(trdpool->phead[i].state==0){
//如果这个时候线程在线程函数中
//尚未进入等待状态就再睡一会儿
usleep(400);
goto lblready;
}
}
}
}
voidstart_routine(voidarg) 线程执行函数
void*start_routine(void*arg)
{
bool*state=arg; //提取出参数,这里我就只用了前面提到的trdpool->phead[i].state,也就是
//线程是否进入等待状态的标志
work_t*work;
while(1){
pthread_mutex_lock(&trdpool->mutex); //给条件变量加锁,同时也算是给任务链表加锁
while( trdpool->whead==NULL&& trdpool->status==0){ //使用while的原因,之后条件变量改变之后会有抢锁,抢到锁的线程从任务队头拿走任务,则任务链表可能变空,而此时抢到锁的线程释放锁则让唤醒的线程继续睡眠。 后面那个status条件表明线程池尚未被摧毁
fprintf(stdout,"thread %ld is ready \n",pthread_self());
*state=1; //表示线程要去睡眠了
pthread_cond_wait(& trdpool->cond,& trdpool->mutex);
//唤醒俩条件1.有任务2.线程池释放摧毁信号
fprintf(stdout,"thread %ld is weak up \n",pthread_self());
}
if( trdpool->status==0){ //如果不是要摧毁线程池
work= trdpool->whead; // 取出任务
trdpool->whead= trdpool->whead->next; //任务链表去除任务
pthread_mutex_unlock(& trdpool->mutex); //解锁
(work->fun)(work->arg); //执行任务函数
free(work); //释放动态分配的任务
}else{
pthread_mutex_unlock(& trdpool->mutex); //如果是想摧毁线程池
pthread_exit(NULL); //这里就线程退出就好了
}
}
}
void add_work() 添加任务
void add_work()
{
work_t* new_work=malloc(sizeof(work_t));
work_t* old_work;
new_work->arg=NULL;
new_work->fun=print_pthread_self;
new_work->next=NULL;
//设置任务的有关信息
pthread_mutex_lock(& trdpool->mutex);//锁住任务链表
//添加任务
if( trdpool->whead!=NULL){
old_work= trdpool->whead;
trdpool->whead=new_work;
new_work->next=old_work;
}else{
trdpool->whead=new_work;
}
pthread_cond_signal(& trdpool->cond); //通知唤醒等待阻塞的线程
pthread_mutex_unlock(& trdpool->mutex); //解锁
}
void prdpool_destory()
void prdpool_destory()
{
puts("the trdpool begin to destory !");
work_t *work;
trdpool->status=1; //标记线程池需要摧毁
pthread_mutex_lock(& trdpool->mutex); //加锁
pthread_cond_broadcast(& trdpool->cond); //广播唤醒等待中的所有线程
pthread_mutex_unlock(& trdpool->mutex); //解锁
for(int i=0;i< trdpool->num;i++){
pthread_join(*(trdpool->phead[i]._pthread),NULL);//这里其实是阻塞等待每个线程
free(trdpool->phead[i]._pthread);
}
free( trdpool->phead);
work=trdpool->whead;
while(trdpool->whead){ //释放每一个任务
trdpool->whead=trdpool->whead->next;
free(work);
work=trdpool->whead;
}
pthread_cond_destroy(& trdpool->cond); //释放条件变量
pthread_mutex_lock(& trdpool->mutex); //保证没人用锁
pthread_mutex_destroy(& trdpool->mutex); //释放锁
free(trdpool);
puts("the trdpool destory !");
}
下面是我写的源码-------->
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include<sys/mman.h>
#include<pthread.h>
#include<semaphore.h>
#include <netinet/in.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<sys/select.h>
#include<sys/epoll.h>
#include<unistd.h>
#include<dirent.h>
#include<pwd.h>
#include<grp.h>
#include<time.h>
#include<errno.h>
#include<signal.h>
#include<fcntl.h>
#include<sys/wait.h>
#include<stdbool.h>
typedef void Fun(void*);
typedef struct work_t{
Fun* fun;
void*arg;
struct work_t*next;
}work_t;
typedef struct my_pthread_t
{
pthread_t *_pthread;
bool state;
}my_pthread_t;
typedef struct trdpool_t{
my_pthread_t *phead;
// pthread_t *phead;
work_t *whead;
pthread_cond_t cond;
pthread_mutex_t mutex;
bool status; //0 线程池不销毁 1 销毁
int num;
}trdpool_t;
trdpool_t *trdpool=NULL;
void print_pthread_self(void*arg)
{
printf("pthread id = %ld\n",pthread_self());
sleep(2);
return ;
}
void*start_routine(void*arg)
{
bool*state=arg;
work_t*work;
while(1){
pthread_mutex_lock(&trdpool->mutex);
while( trdpool->whead==NULL&& trdpool->status==0){ //一个小朋友把蛋糕抢走了,那么其他小朋友醒来再继续睡。
fprintf(stdout,"thread %ld is ready \n",pthread_self());
*state=1;
pthread_cond_wait(& trdpool->cond,& trdpool->mutex);
fprintf(stdout,"thread %ld is weak up \n",pthread_self());
}
if( trdpool->status==0){
work= trdpool->whead;
trdpool->whead= trdpool->whead->next;
pthread_mutex_unlock(& trdpool->mutex);
(work->fun)(work->arg);
free(work);
}else{
pthread_mutex_unlock(& trdpool->mutex);
pthread_exit(NULL);
}
}
}
void add_work()
{
work_t* new_work=malloc(sizeof(work_t));
work_t* old_work;
new_work->arg=NULL;
new_work->fun=print_pthread_self;
new_work->next=NULL;
pthread_mutex_lock(& trdpool->mutex);
if( trdpool->whead!=NULL){
old_work= trdpool->whead;
trdpool->whead=new_work;
new_work->next=old_work;
}else{
trdpool->whead=new_work;
}
pthread_cond_signal(& trdpool->cond);
pthread_mutex_unlock(& trdpool->mutex);
}
void prdpool_init(int num)
{
trdpool=malloc(sizeof(trdpool_t));
pthread_mutex_init(& trdpool->mutex,NULL);
pthread_cond_init(& trdpool->cond,NULL);
trdpool->status=0;
trdpool->phead=malloc(sizeof(my_pthread_t)*num);
trdpool->num=num;
for(int i=0;i<num;i++){
trdpool->phead[i]._pthread=malloc(sizeof(pthread_t)*num);
trdpool->phead[i].state=0;
bool*pthread_arg=&(trdpool->phead[i].state);
pthread_create(trdpool->phead[i]._pthread,NULL,start_routine,(void*)pthread_arg);
lblready:{
if(trdpool->phead[i].state==0){
usleep(400);
goto lblready;
}
}
}
}
void prdpool_destory()
{
puts("the trdpool begin to destory !");
work_t *work;
trdpool->status=1;
pthread_mutex_lock(& trdpool->mutex);
pthread_cond_broadcast(& trdpool->cond);
pthread_mutex_unlock(& trdpool->mutex);
for(int i=0;i< trdpool->num;i++){
pthread_join(*(trdpool->phead[i]._pthread),NULL);
free(trdpool->phead[i]._pthread);
}
free( trdpool->phead);
work=trdpool->whead;
while(trdpool->whead){
// puts("!");
trdpool->whead=trdpool->whead->next;
free(work);
work=trdpool->whead;
}
pthread_cond_destroy(& trdpool->cond);
pthread_mutex_lock(& trdpool->mutex);
pthread_mutex_destroy(& trdpool->mutex);
free(trdpool);
puts("the trdpool destory !");
}
int main(int argc,char**argv)
{
prdpool_init(5);
// sleep(1);
for(int i=0;i<10;i++)
add_work();
sleep(10);
// while(1);
prdpool_destory();
return 0;
}
adl@adl:~/桌面/linux$ ./a.out
thread 140704759068416 is ready
thread 140704750675712 is ready
thread 140704672118528 is ready
thread 140704663725824 is ready
thread 140704655333120 is ready
thread 140704759068416 is weak up
pthread id = 140704759068416
thread 140704655333120 is weak up
pthread id = 140704655333120
thread 140704663725824 is weak up
pthread id = 140704663725824
thread 140704750675712 is weak up
pthread id = 140704750675712
thread 140704672118528 is weak up
pthread id = 140704672118528
pthread id = 140704759068416
pthread id = 140704655333120
pthread id = 140704663725824
pthread id = 140704750675712
pthread id = 140704672118528
thread 140704759068416 is ready
thread 140704655333120 is ready
thread 140704663725824 is ready
thread 140704750675712 is ready
thread 140704672118528 is ready
the trdpool begin to destory !
thread 140704750675712 is weak up
thread 140704655333120 is weak up
thread 140704663725824 is weak up
thread 140704672118528 is weak up
thread 140704759068416 is weak up
the trdpool destory !
以上是我写的简单线程池,其实在写的过程中遇到过些问题,但有在学长和热心网友的帮助下学到了许多。
挂论坛上的链接
参考了:
学长博客
大家可以去看这个是比较好的:
详细的线程池实现