线程池是什么?
所谓线程池就是一个池子,池子里面充满线程,这些线程处于阻塞状态,当有任务需要创建线程执行的时候,会唤醒线程池中的一个线程,被唤醒的线程执行当这个任务,执行完后,又回到线程池阻塞,等待下次唤醒。等待执行的任务会形成任务队列,任务多于线程的时候(就是说线程不够用的时候),按照一定的次序出队,再交由线程执行。
为什么需要线程池?
试想这样一个情景,单个任务处理事件很短,但是请求数目很大很大,如果我们针对每个任务创建一个线程去解决问题的话,固然可以解决,但是系统在创建线程、销毁线程中的开销是很大的,究竟有多大我们通过一个例子来看一下:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
void *f(void *arg) {
int a = 0;
pthread_exit(NULL);
}
int main()
{
pthread_t tid;
time_t sum = 0;
time_t begin = clock();
for(int i = 0;i < 1000;++i) {
time_t pthread_begin = clock();
pthread_create(&tid,NULL,f,NULL);
time_t pthread_end = clock();
sum += pthread_end - pthread_begin;
}
time_t end = clock();
printf("time = %ld\n",end-begin);
printf("invaildtime = %ld\n",sum);
return 0;
}
运行结果:
time = 30253
invaildtime = 29612
由此观之,这个程序98%时间都用在创建新线程上面,所以针对上面的任务简单但是量大的情况,每个任务新创建一个线程真的是效率太低了,所以有了线程池,维护一定数量的线程不用频繁的创建和销毁线程。
使用线程池的效率究竟能高多少?
int main()
{ time_t begin = clock();
int *tasknum = (int *)malloc(sizeof(int)*100000);
for(int i = 0;i<100000;i++) {
pthread_t tid;
tasknum[i] = i;
pthread_create(&tid,NULL,func,NULL);
}
sleep(5);
free(tasknum);
time_t end = clock();
printf("time = %ld\n",end-begin);
return 0;
}
不使用用线程池执行上述程序,运行结果是:time = 890837
int main(){
time_t begin = clock();
threadpool_init(800);
//向线程池中投入10个任务
int *tasknum = (int *)malloc(sizeof(int)*100000);
for(int i = 0;i<100000;i++) {
tasknum[i] = i;
pool_add_task(func,&tasknum[i]);
}
sleep(5);
threadpool_destroy();
free(tasknum);
time_t end = clock();
printf("time = %ld\n",end-begin);
return 0;
}
800个线程的线程池的运行结果是time = 546816,这还只是100000数据,而且线程池的线程数量和性能也有关系,800可能不是最优的。
线程池的工作机理
C语言实现线程池
c语言是实现也很简单只要四个函数:初始化线程池,任务接口函数,向任务队列加任务,销毁线程池。
数据类型
首先是用到的数据类型,任务队列其实是在这使用一条链表实现的
所以头结点是这样的
typedef struct thread_pool
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_cond;
task_t* queue_head;
int shutdown;
pthread_t *threadid;
int max_thread_num;
int cur_queue_size;
}thread_pool;
任务小节点
typedef struct task
{
void *(*process) (void*arg);//函数指针,指向任务
void *arg; //回调函数的参数
struct task *next; //指向下一个任务
}task_t;
这里说一下他的前两个成员,process表示要加入任务队列的函数的名字,
void *(process) (voidarg)是函数指针声明,第二个成员arg就是函数参数。
伪代码
void init()
{
/一些初始化**/
malloc申请头指针
初始化锁mutex初始化cond;
pool->maxsize = ;
pool->task = NULL;//任务队列初始化为空
…等初始化操作
/*******循环创建线程******/
for()循环创建一定数量的线程{
每个线程创建时的线程函数是任务接口函数routine
}
}
void routine()
{
/ 死循环保持所有线程执行完任务后继续阻塞 */
while(1){
lock加锁
/* 判断:如果任务队列不为空,不用阻塞直接执行 */
/* 用while代替if防止虚假唤醒 */
while(任务数量==0 && 销毁标志不为真){
基于条件变量cond阻塞;
}
从任务队列取出任务(此时持有锁);
任务队列做相应变化(出队,数量--);
解锁
执行任务,执行完后return之后仍然回到while(1)里
}
}
void add_task(要执行的函数指针,函数参数)
{
构造并初始化一个任务队列的 任务小节点;
lock加锁;
将任务加入任务队列(类似单链表尾插法);
解锁;
唤醒一个线程去执行相应任务;
}
void destory()
{
销毁标志置为真值;
唤醒所有线程;
for(i < max)循环销毁所有线程
join阻塞主线程结束线程池中的线程
free释放init中申请的头指针
while(cur != NULL)遍历任务队列
销毁任务队列
销毁锁,销毁条件变量
}
四个函数
初始化一个线程池就是通过一个for循环创建一定数量的线程,每个线程创建时候的线程函数是任务接口函数,任务接口函数中通过一个while(1)死循环把所有的线程困在里面,当没有任务唤醒时,线程阻塞在线程池中。有任务唤醒时候,线程通过任务小节点中的函数指针跳转到相应的函数执行任务,结束后仍然被困在死循环中,直到shutdown变为1才会销毁。
在一个就是添加任务函数,与链表添加节点无异。
最后就是释放所有的线程,就是线程销毁函数,需要注意的是如果任务队列还没清空就销毁线程池,就会导致错误。
再者,像前面的向任务队列添加任务等等操作,都需要是原子操作(就是说添加节点的时候,链表不能被其他线程操作而变化),所以就需要互斥锁和条件变量配合使用来保证原子操作。
源码
threadpool.h
#ifndef _THREADHPOOL_H_
#include <threads.h>
#include <stdlib.h>
#include <unistd.h>
typedef struct task{
void *(*process) (void*arg);//函数指针,指向任务
void *arg; //回调函数的参数
struct task *next; //指向下一个任务
}task_t;
typedef struct thread_pool{
pthread_mutex_t queue_lock;
pthread_cond_t queue_cond;
task_t* queue_head;
int shutdown;
pthread_t *threadid;
int max_thread_num;
int cur_queue_size;
}thread_pool;
void threadpool_init(int max_thread_num);
int threadpool_destroy();
void *thread_routine();
int pool_add_task(void*(*process)(void*arg),void *arg);
#endif
threadpool.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>
#include "threadpool2.h"
#include <pthread.h>
static thread_pool* pool = NULL;
//线程池中加入任务
int pool_add_task(void*(*process)(void*arg),void *arg)
{
int ret = 0;
//构造一个新的任务小节点
task_t *new_task = (task_t *)malloc(sizeof(task_t));
new_task->process = process;
new_task->arg = arg;
new_task->next = NULL;//初始化为空
pthread_mutex_lock(&(pool->queue_lock));
//将任务加入到等待队列中
task_t * cur_task = pool->queue_head;
if(cur_task != NULL)
{
while(cur_task->next != NULL)
cur_task = cur_task->next;
cur_task->next = new_task;
}
else
{
pool->queue_head = new_task;
}
//连接新任务
if(pool->queue_head == NULL)
{
ret = -1;
printf("func pool_add_worker err:%d",ret);
return ret;
}
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock));
pthread_cond_signal(&(pool->queue_cond));
return ret;
}
//任务接口函数,子线程统一调用这个函数,这个函数检查任务队列中的任务
void *thread_routine(void *arg)
{
/* printf("线程[0x%lx]加入线程池\n",pthread_self()); */
//死循环保持所有线程
while(1)
{
pthread_mutex_lock (&(pool->queue_lock));//给线程加锁
//无任务状态和不销毁时,线程阻塞等待
while(pool->cur_queue_size == 0 && pool->shutdown != 1)
{
/* printf("线程[0x%lx]正在等待\n",pthread_self()); */
pthread_cond_wait(&(pool->queue_cond),&(pool->queue_lock));//基于条件变量阻塞
}
if(pool->shutdown)//线程池要销毁
{
//先解锁后结束
pthread_mutex_unlock(&(pool->queue_lock));
/* printf("线程[0x%lx]将要销毁\n",pthread_self()); */
pthread_exit(NULL);
}
/* printf("线程[0x%lx]将要执行任务\n",pthread_self()); */
//断言函数
assert(pool->cur_queue_size != 0);
assert(pool->queue_head != NULL);
//任务队列长度-1,取出队首节点就是要执行的任务
pool->cur_queue_size--;
task_t * cur_task = pool->queue_head;
pool->queue_head = cur_task->next;
//解锁
pthread_mutex_unlock(&(pool->queue_lock));
(*(cur_task->process))(cur_task->arg);//函数指针
free(cur_task);
cur_task = NULL;
}
printf("线程[0x%lx]异常退出线程池\n",pthread_self());
}
//线程池初始化
//pool表示指向头结点的一个指针,max_thread_num表示线程池中最大的线程数
void threadpool_init(int max_thread_num)
{
//对头指针的初始化
pool = (thread_pool*)malloc(sizeof(thread_pool));
pthread_mutex_init(&(pool->queue_lock),NULL);
pthread_cond_init(&(pool->queue_cond),NULL);
pool->queue_head = NULL;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->shutdown = 0;
pool->threadid = (pthread_t *)malloc(max_thread_num*sizeof(pthread_t));
//thread_t 表示线程的标示
for(int i = 0;i<max_thread_num;i++)
{
pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL);
}
}
//销毁线程池中的所有线程,清空任务队列
//pool指向头节点的指针
int threadpool_destroy()
{
int ret = 0;
if(pool->shutdown)
{
ret = -1;
printf("多次销毁线程池:%d\n",ret);
return ret;//防止多次调用
}
pool->shutdown = 1;
//唤醒所有线程,以便于销毁
pthread_cond_broadcast(&(pool->queue_cond));
for(int i = 0;i<pool->max_thread_num;i++)
{
//阻塞主线程,结束这些线程
pthread_join((pool->threadid[i]),NULL);
}
free(pool->threadid);
//销毁任务队列
task_t *cur = NULL;//辅助指针遍历整个链表
while(pool->queue_head != NULL)
{
cur = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(cur);
}
//销毁互斥锁和条件变量
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_cond));
free(pool);
pool = NULL;
return ret;
}