一、线程同步
协同步调,对公共区域进行访问
建议锁:应在访问前先拿锁
例:
#include <stdio.h>
#include <string.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
pthread_mutex_t mutex; //定义互斥锁(想像为一个整数)
void *tfn(void *arg)
{
srand(time(NULL));
while(1){
pthread_mutex_lock(&mutex); //加锁 (锁--)
printf("hello ");
sleep(rand()%3); //cpu易主
printf("world\n");
pthread_mutex_unlock(&mutex); //解锁 (锁++)
sleep(rand()%3);
//在这解锁会使一直小写,线程不易挂起
}
return NULL;
}
int main()
{
pthread_t tid;
srand(time(NULL));
int ret=pthread_mutex_init(&mutex,NULL); //初始化互斥锁 (值为1)
if(ret != 0){
fprintf(stderr,"mutex init error:%s\n",strerror(ret));
exit(1);
}
pthread_create(&tid,NULL,tfn,NULL);
while(1){
pthread_mutex_lock(&mutex); //加锁
printf("HELLO ");
sleep(rand()%3); //cpu易主
printf("WORLD\n");
pthread_mutex_unlock(&mutex); //解锁
sleep(rand()%3);
}
pthread_join(tid,NULL);
pthread_mutex_destroy(&mutex); //销毁互斥锁
return 0;
}
二、线程同步方法
A.互斥量(锁) mutex
1.主要函数
成功返回0,错误返回错误号
1.pthread_mutex_init 初始化锁
#include<pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr);
//第二个参数是属性,没要求NULL
//restrict关键字:用来限定指针变量,被限定的指针所指的内存操作,必须有本指针完成
初始化一个互斥锁,初值为1
2.pthread_mutex_destroy 销毁锁
#include<pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex)
3.pthread_mutex_lock 加锁
#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex)
4.pthread_mutex_trylock 尝试拿锁
#include<pthread.h>
int pthread_mutex_trylock(pthread_mutex_t *mutex)
5.pthread_mutex_unlock 解锁
#include<pthread.h>
int pthread_mutex_unlock(pthread_mutex_t *mutex)
2.基本类型
1.pthread_mutex_t
本质是一个结构体,可当作整数看待,只有0/1两种取值
3.使用步骤
4.注意
1.尽量保证锁的粒度越小越好,访问共享数据前加锁,结束立即解锁
2.互斥锁(看成整数) 初始值为1(本质是结构体)
加锁 – ,阻塞线程
解锁 ++,唤醒线程
3.try锁
成功 - -
失败返回错误号(EBUSY),不阻塞
B.死锁
一种使用锁的现象
C.读写锁
锁只有一把
读共享,写独占
写锁优先级高
以读方式给数据加锁——读锁
以写方式给数据加锁——写锁
1.读写锁特点
2.主要函数
#include<pthread.h>
int pthread_rwlock_destory(thread_rwlock_t *rwlock)
例:
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
int counter;
pthread_rwlock_t rwlock;
void *th_write(void *arg)
{
int t;
int i=(int)arg;
while(1){
pthread_rwlock_wrlock(&rwlock);
t=counter;
usleep(1000);
printf("======write %d: %lu: counter=%d ++counter=%d\n",i,pthread_self(),t,++counter);
pthread_rwlock_unlock(&rwlock);
usleep(10000);
}
return NULL;
}
void *th_read(void *arg)
{
int i=(int)arg;
while(1){
pthread_rwlock_rdlock(&rwlock);
printf("--------------------------read %d: %lu: %d\n",i,pthread_self(),counter);
pthread_rwlock_unlock(&rwlock);
usleep(2000);
}
return NULL;
}
int main(void)
{
int i;
pthread_t tid[8];
pthread_rwlock_init(&rwlock,NULL);
for(i=0;i<3;i++){
pthread_create(&tid[i],NULL,th_write,(void*)i);
}
for(i=0;i<5;i++){
pthread_create(&tid[i+3],NULL,th_read,(void*)i);
}
for(i=0;i<8;i++){
pthread_join(tid[i],NULL);
}
pthread_rwlock_destroy(&rwlock);
return 0;
}
D.条件变量
本身不是锁,但可以造成线程阻塞,与互斥锁配合使用,给多线程提供一个回合的场所。
主要函数
1.pthread_cond_init / pthread_cond_destory
初始化、销毁条件变量
#include<pthread.h>
//动态初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
/*也可以使用静态初始化方法
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
*/
2.pthread_cond_wait
阻塞等待一个条件变量
#include<pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
作用:
1.阻塞等待条件变量满足
2.释放已经掌握的互斥锁(解锁),相当于调pthread_mutex_unlock(&mutex)
1,2一步完成(原子操作)
3.当被唤醒时,pthread_cond_wait返回时,解除阻塞并重新加锁pthread_mutex_lock(&mutex)
3.pthread_cond_timewait
限时等待条件变量
4.pthread_cond_signal / pthread_cond_broadcast
发送一个信号给另外一个正在处于阻塞等待状态的(一个/多个)线程,使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。
E.信号量
相当于初始化值为n的互斥量(n表示可以同时访问共享区的线程数)
主要函数
#include<semaphore.h>
int sem_init(sem_t *sem,int pshared,unsigned int value);
/*
pshared :
0: 线程间同步
非0: 进程间同步
value : n值(同时访问线程数)
*/
sem_wait: 信号量大于0,信号量–(加锁)
信号量等于0,造成线程阻塞
sem_post: 信号量++(解锁)
信号量的初值,决定占用信号量线程的个数
问题实践
1.生产者消费者问题
代码实现:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
//链表作为共享数据,需要被互斥量保护
struct msg{
struct msg *next;
int num;
};
struct msg *head;
//静态初始化条件变量和互斥量
pthread_cond_t has_product=PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
void *consumer(void *p)
{
struct msg *mp;
for(;;){
pthread_mutex_lock(&lock);
while(head==NULL){
pthread_cond_wait(&has_product,&lock);
}
mp=head;
head=mp->next; //模拟消费掉一个产品
pthread_mutex_unlock(&lock);
printf("-Consume %lu---%d\n",pthread_self(),mp->num);
free(mp);
sleep(rand()%5);
}
}
void *producer(void *p)
{
struct msg *mp;
for(;;){
mp=malloc(sizeof(struct msg));
mp->num=rand()%1000+1; //模拟生产一个产品
printf("-Produce----------------%d\n",mp->num);
pthread_mutex_lock(&lock);
mp->next=head;
head=mp; //头插法
pthread_mutex_unlock(&lock);
pthread_cond_signal(&has_product); //唤醒
sleep(rand()%5);
}
}
int main(int argc,char *argv[])
{
pthread_t pid,cid;
srand(time(NULL));
pthread_create(&pid,NULL,producer,NULL);
pthread_create(&cid,NULL,consumer,NULL);
pthread_join(pid,NULL);
pthread_join(cid,NULL);
return 0;
}
2.哲学家问题
#include <stdio.h>
#include <unistd.h>
//#include <stdlib.h>
#include <pthread.h>
pthread_mutex_t chopsticks[5];
void fun(int i){
//int k = 0;
while(1){
if(pthread_mutex_trylock(&chopsticks[i])==0){
if(pthread_mutex_trylock(&chopsticks[(i+1)%5])==0){
printf("zhexuejia %d is eatinging\n",i);
//k++;
usleep(10);
pthread_mutex_unlock(&chopsticks[i]);
pthread_mutex_unlock(&chopsticks[(i+1)%5]);
}else{
printf("zhexuejia %d is thinking\n",i);
//k++;
pthread_mutex_unlock(&chopsticks[i]);
usleep(10);
}
}else{
printf("zhexuejia %d is thinking\n",i);
//k++;
usleep(10);
}
}
}
int main(){
int t = 0;
while(t<5){
pthread_mutex_init(&chopsticks[t],NULL);
t++;
}
pthread_t tid1,tid2,tid3,tid4,tid5;
pthread_create(&tid1,NULL,(void*)fun,0);
pthread_create(&tid2,NULL,(void*)fun,1);
pthread_create(&tid3,NULL,(void*)fun,2);
pthread_create(&tid4,NULL,(void*)fun,3);
pthread_create(&tid5,NULL,(void*)fun,4);
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_join(tid3,NULL);
pthread_join(tid4,NULL);
pthread_join(tid5,NULL);
}
3.线程池实现
有问题!!!还没理解全!!
threadpool.h
#ifndef _THREQDPOOL_H
#define _THREQDPOOL_H
#include<pthread.h>
#include<string.h>
#include<errno.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
//用户需求
struct Task{
void (*func)(void *arg);
void *arg;
};
struct ThreadPool{
struct Task *taskQ;
int quCapacity; //容量
int quCount; //目前的任务数
int quFount; //存在前
int quRear; //取在后
//工作者(消费者)线程
pthread_t *threadWorker;
//线程管理者
pthread_t *threadManager;
int minNum;
int maxNum;
int busyNum;//忙线程数
int liveNum;//存活线程个数
int exitNum;
//互斥锁
pthread_mutex_t mutexpool;//
pthread_mutex_t mutexbusy;//取出来任务就可以单独工作了,不用对整个线程池上锁
//条件变量
pthread_cond_t isFull;
pthread_cond_t isEmpty;
//销毁线程池 0运行
int shutdown
};
#define ADDNUM 2
#define DELNUM 2
typedef struct ThreadPool ThreadPool_t;
//线程池初始化接口
ThreadPool_t* threadPoolCreate(int min,int max,int quCapacity);
//给队列添加任务的接口
int threadPoolAdd(ThreadPool_t *pool,void(*func)(void*),void *arg);
//工作者线程调用函数
void *worker(void *arg);
//管理者线程调用函数
void *manager(void *arg);
//对线程池销毁的接口
int threadPoolDestory(ThreadPool_t *pool);
#endif
threadpool.c
#include "threadpool.h"
ThreadPool_t* threadPoolCreate(int min,int max,int quCapacity)
{
//int isError;
ThreadPool_t *pool = (ThreadPool_t *)malloc(sizeof(ThreadPool_t));
do{
if(pool==NULL)
{
perror("fail pool is 'NULL'");
//isError = -1;
//break;
return NULL;
}
memset(pool,0,sizeof(ThreadPool_t));//初始化线程池结构体
if(min<1)
min=1;
if(quCapacity<1)
quCapacity=1;
//初始化线程队列
pool->threadWorker=(pthread_t*)malloc(sizeof(pthread_t)*max);
if(pool->threadWorker==NULL)
{
perror("fail to pool->threadWorker malloc erreo");
//isError=-2;
//break;
return NULL;
}
memset(pool->threadWorker,0,sizeof(pthread_t)*max);
for(int i=0;i<min;i++)
{
if(pthread_create(&pool->threadWorker[i],NULL,worker,pool)!=0)
{
perror("fail to pthread_create Worker error");
//isError=-3;
//break;
return NULL;
}
}
pool->minNum=min;
pool->maxNum=max;
pool->busyNum=0;
pool->liveNum=min;
pool->exitNum=0;
//初始化任务队列
pool->quCapacity = quCapacity;
pool->taskQ = (struct Task*)malloc(sizeof(struct Task)*pool->quCapacity);
memset(pool->taskQ,0,sizeof(struct Task)*pool->quCapacity);
if(pool->taskQ==NULL)
{
perror("fail to pool->tastQ Worker error");
//isError=-4;
//break;
return NULL;
}
pool->quCount=0;
pool->quFount=0;
pool->quRear=0;
//初始化线程管理者
if(pthread_create(pool->threadManager,NULL,manager,pool)!=0)
{
perror("fail to pthread_create Manager error");
//isError=-5;
//break;
return NULL;
}
//初始化互斥锁和条件变量
if(pthread_mutex_init(&pool->mutexbusy,NULL)!=0 ||
pthread_mutex_init(&pool->mutexpool,NULL)!=0 ||
pthread_cond_init(&pool->isEmpty,NULL)!=0 ||
pthread_cond_init(&pool->isFull,NULL)!=0)
{
perror("fail to pthread_mutex_cond error");
//isError=-6;
//break;
return NULL;
}
//初始化线程销毁标志
pool->shutdown=0;
return pool; //!!!!
}while(0);
if(pool&&pool->threadWorker)
free(pool->threadWorker);
if(pool&&pool->taskQ)
free(pool->taskQ);
if(pool)
free(pool);
return NULL;
}//线程池创建
int threadPoolAdd(ThreadPool_t *pool,void(*func)(void*),void *arg)
{
if(pool==NULL)
{
return -1;
}
pthread_mutex_lock(&pool->mutexpool);
while(pool->quCount==pool->quCapacity && pool->shutdown==0)
{
pthread_cond_wait(&pool->isFull,&pool->mutexpool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexpool);
return 0;
}
pool->taskQ[pool->quFount].func=func;
pool->taskQ[pool->quFount].arg=arg;
pool->quFount = (pool->quFount + 1) % pool->quCapacity;
pool->quCount++;
pthread_cond_signal(&pool->isEmpty);
pthread_mutex_unlock(&pool->mutexpool);
}
void threadExit(ThreadPool_t *pool)
{
if(pool == NULL)
return ;
for(int i = 0;pool->maxNum;i++)
{
if(pool->threadWorker[i] == pthread_self())
{
pool->threadWorker[i] == 0;
break;
}
}
printf("工作者线程 %ld 死亡\n",pthread_self());
pthread_mutex_unlock(&pool->mutexpool);
pthread_exit(NULL);
}
void *worker(void *arg)
{
pthread_detach(pthread_self());//子线程设成分离态,自己回收资源
ThreadPool_t *pool=(ThreadPool_t*)arg;
while(1)
{
pthread_mutex_lock(&pool->mutexpool);
while(pool->quCount == 0 && pool->shutdown == 0) {
pthread_cond_wait(&pool->isEmpty, &pool->mutexpool);
if (pool->exitNum > 0 && pool->shutdown == 0)
{
pool->exitNum--;
if(pool->liveNum > pool->minNum)
{
pool->liveNum--;
threadExit(pool);
}
}
}
if(pool->shutdown)
{
//pthread_mutex_unlock(&pool->mutexpool);
threadExit(pool);
}
//从任务队列取任务,并解锁
struct Task *task = (struct Task*)malloc(sizeof(struct Task));
memset(task,0,sizeof(struct Task));
task->func = pool->taskQ[pool->quRear].func;
task->arg = pool->taskQ[pool->quRear].arg;
pool->quRear = (pool->quRear + 1) % pool->quCapacity;
pool->quCount--;
pthread_cond_signal(&pool->isFull);
pthread_mutex_unlock(&pool->mutexpool);
printf("工作者线程 %ld 开始工作--------\n",pthread_self());
pthread_mutex_lock(&pool->mutexbusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexbusy);
task->func(task->arg);
printf("工作者线程 %ld 退出工作--------\n",pthread_self());
task->func = NULL;
free(task->arg);
task->arg = NULL;
free(task);
task = NULL;
pthread_mutex_lock(&pool->mutexbusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexbusy);
}
}
void *manager(void *arg)
{
ThreadPool_t *pool=(ThreadPool_t *)arg;
while(1)
{
sleep(2);
pthread_mutex_lock(&pool->mutexbusy);
int busyNum=pool->busyNum;
pthread_mutex_unlock(&pool->mutexbusy);
pthread_mutex_lock(&pool->mutexpool);
int liveNum=pool->liveNum;
pthread_mutex_unlock(&pool->mutexpool);
if(pool->shutdown == 1)
{
printf("管理者线程已退出\n");
pthread_exit(NULL);
}
//添加线程
//当前任务数 > 当前线程数 && 当前线程数 < 最大线程数,添加ADDNUM个
pthread_mutex_lock(&pool->mutexpool);
int addnum = 0;
for(int i = 0 ; i < pool->maxNum && pool->quCount > liveNum ; i++)
{
if(pool->threadWorker[i] == 0 && pool->shutdown == 0)
{
pthread_create(&pool->threadWorker[i],NULL,worker,pool);
printf("工作者线程 %ld 被管理者线程赋予生命-------",pthread_self());
addnum++;
pool->liveNum++;
if(addnum == ADDNUM || pool->liveNum == pool->maxNum)
{
break;
}
}
}
pthread_mutex_unlock(&pool->mutexpool);
//删除线程
//工作中的线程数 *2 < 存活线程数 && 存活线程数 > 最小线程数
if(busyNum*2 <liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexpool);
pool->exitNum =DELNUM;
pthread_mutex_unlock(&pool->mutexpool);
for(int i = 0; i<DELNUM;i++)
{
pthread_cond_signal(&pool->isEmpty);
}
}
}
}
int threadPoolDestory(ThreadPool_t *pool)
{
if(pool == NULL)
return -1;
pool->shutdown = 1;
pthread_join(*pool->threadManager,NULL);
for(int i = 0;i < pool->liveNum;i++)
{
pthread_cond_signal(&pool->isEmpty);
}
sleep(3);
if(pool && pool->threadWorker)
{
free(pool->threadWorker);
}
if(pool && pool->taskQ)
{
free(pool->taskQ);
}
//删除互斥锁和条件变量
if(pthread_mutex_destroy(&pool->mutexbusy)!=0 ||
pthread_mutex_destroy(&pool->mutexpool)!=0 ||
pthread_cond_destroy(&pool->isEmpty)!=0 ||
pthread_cond_destroy(&pool->isFull)!=0)
{
perror("fail to pthread_mutex_cond destroy error");
return -2;
}
free(pool);
return 0;
}
void func(void *arg)
{
int num=*(int*)arg;
printf("thread %ld is working , number = %d,tid = %ld\n",num,pthread_self());
usleep(1000);
}
int main()
{
ThreadPool_t* pool=threadPoolCreate(3,10,100);
for(int i=0;i<100;i++)
{
int *num=(int*)malloc(sizeof(int));
*num=i+100;
threadPoolAdd(pool,func,num);
}
sleep(30); //使所有线程都工作完
threadPoolDestory(pool);
return 0;
}
正确的代码
text.h
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include<pthread.h>
#include<string.h>
#include<errno.h>
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#define NUMBER 5
// 任务结构体
typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);
#endif // _THREADPOOL_H
text.c
#include "text.h"
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}
// 任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
// 创建线程
pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
// 释放资源
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{
if (pool == NULL)
{
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&pool->mutexPool);
while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
{
// 阻塞生产者线程
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
return;
}
// 添加任务
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}
void* worker(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void* manager(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER
&& pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
void threadExit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n",
pthread_self(), num);
sleep(1);
}
int main()
{
// 创建线程池
ThreadPool* pool = threadPoolCreate(3, 10, 100);
for (int i = 0; i < 100; ++i)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
threadPoolAdd(pool, taskFunc, num);
}
sleep(30);
threadPoolDestroy(pool);
return 0;
}