线程终止
Linux下有两种方式可以使线程终止:
- 通过return 从线程函数返回
- 通过调用函数pthread_exit()使线程退出
#include<pthread.h>
void pthread_exit(void *retval);
两种特殊情况
- 在主线程中,如果从main函数返回或是调用了exit函数退出主线程,则整个进程终止,此时进程中所有线程也将终止
- 在主线程中,如果调用pthread_exit()函数,则仅仅是主线程消亡,进程不会结束,进程内其他线程也不会终止,直到所有线程结束,进程才会结束。
两个问题:
- 线程终止必须释放其占有的临界资源,采用pthread_cleanup_push()、pthread_cleanup_pop()函数用于自动释放资源
#include<pthread.h>
#define pthread_cleanup_push(routine,arg){
struct _pthread_cleanup_buffer buffer;
_pthread_cleanup_push(&buffer,(routine),(srg));
#define pthread_cleanup_pop
_pthread_cleanup_pop(&buffer,(exeute));}
- 线程间的同步问题
函数pthread_join()用来等待一个线程的结束
#include<pthread.h>
void pthread_exit(void *retval);
int pthread_join(pthread_t th,void *thread_return);
int pthread_detach(pthrerad_t th);
一个线程仅允许一个线程使用pthread_join()等待它的终止,并且被等待的线程应该处于可join状态。为了避免内存泄漏,所有线程终止时,要么被设为DETACHED,要么使用pthread_join()来回收资源。
私有数据
一键多值:好像是对一个变量进行访问,其实是在访问不同的数据
操作线程私有数据的函数主要有:
#include<pthread.h>
int pthread_key_create(pthread_key_t *key,void (*destr_function)(void*));//创建一个键
int pthread_setspecific(pthread_key_t key,const void* pointer);//为一个键设置进程私有数据
void *pthread_getspecific(pthread_key_t key);//从一个键读取线程私有数据
int pthread_key_delete(pthread_key_t key);//删除一个键
- pthread_key_create:第一个参数为指向键值的指针,第二个参数为一个函数指针,如果指针不为空,则在线程退出时将以key所关联的数据为参数调用destr_function(),释放分配的缓冲区。
线程同步
处理线程间的同步问题最常用的有互斥锁、条件变量、异步信号。
操作 | 函数 |
---|---|
初始化 | pthread_mutex_init() |
加锁 | pthread_mutex_lock() |
测试加锁 | pthread_mutex_trylock() |
解锁 | pthread_mutex_unlock() |
销毁 | pthread_mutex_destory() |
使用条件变量主要包括两个动作,一个等待使用资源的线程等待“条件变量被设置为真”,另一个线程在使用完资源后“设置条件为真”,从而保证线程间的同步。
操作 | 函数 |
---|---|
初始化 | pthread_cond_init |
无条件等待 | pthread_cond_wait |
计时等待 | pthread_cond_timewait |
解除特定线程的阻塞 | pthread_cond_signal |
解除所有线程的阻塞 | pthread_cond_broadcast |
清除条件变量 | pthread_cond_deestory |
多线程经典案例
生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者进程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
需要注意的问题:
- 缓冲区为空时,消费者不能再进行消费
- 缓冲区为满时,生产者不能再进行生产
- 在一个线程进行生产或消费时,其余线程不能再进行生产或消费,因此要保证线程的同步。
- 生产者线程在生产之前,需要wait直到获取自己所需要的信号后,才会进行生产操作,消费者线程在消费之前,需要wait直到获取自己所需要的信号后,才会进行消费操作。
- 为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某时刻只有一个线程在访问共享资源。
实现代码如下:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include<stdlib.h>
#define N 100
#define true 1
#define producerNum 10
#define consumerNum 5
#define sleepTime 1000
typedef int semaphore;
typedef int item;
item buffer[N] = {0};
int in = 0;
int out = 0;
int proCount = 0;
semaphore mutex = 1, empty = N, full = 0, proCmutex = 1;
void * producer(void * a){
while(true){
while(proCmutex <= 0);
proCmutex--;
proCount++;
printf("生产一个产品ID%d, 缓冲区位置为%d\n",proCount,in);
proCmutex++;
while(empty <= 0){
printf("缓冲区已满!\n");
}
empty--;
while(mutex <= 0);
mutex--;
buffer[in] = proCount;
in = (in + 1) % N;
mutex++;
full++;
sleep(sleepTime);
}
}
void * consumer(void *b){
while(true){
while(full <= 0){
printf("缓冲区为空!\n");
}
full--;
while(mutex <= 0);
mutex--;
int nextc = buffer[out];
buffer[out] = 0;//消费完将缓冲区设置为0
out = (out + 1) % N;
mutex++;
empty++;
printf(“消费一个产品ID%d,缓冲区位置为%d\n", nextc,out);
sleep(sleepTime);
}
}
int main()
{
pthread_t threadPool[producerNum+consumerNum];//创建线程池
int i;
for(i = 0; i < producerNum; i++){
pthread_t temp;
if(pthread_create(&temp, NULL, producer, NULL) == -1){
printf("ERROR, fail to create producer%d\n", i);
exit(1);
}
threadPool[i] = temp;
}//创建生产者进程放入线程池
for(i = 0; i < consumerNum; i++){
pthread_t temp;
if(pthread_create(&temp, NULL, consumer, NULL) == -1){
printf("ERROR, fail to create consumer%d\n", i);
exit(1);
}
threadPool[i+producerNum] = temp;
}//创建消费者进程放入线程池
void * result;
for(i = 0; i < producerNum+consumerNum; i++){
if(pthread_join(threadPool[i], &result) == -1){
printf("fail to recollect\n");
exit(1);
}
}//运行线程池
return 0;
}
五个哲学家绕着圆桌坐,每个哲学家面前有一盘面,两人之间有一支筷子,这样每个哲学家左右各有一支筷子。哲学家有2个状态,思考或者拿起筷子吃饭。如果哲学家拿到一只筷子,不能吃饭,直到拿到2只才能吃饭,并且一次只能拿起身边的一支筷子。一旦拿起便不会放下筷子直到把饭吃完,此时才把这双筷子放回原处。如果,很不幸地,每个哲学家拿起他或她左边的筷子,那么就没有人可以吃到饭了。
至多只允许有四位哲学家同时去拿左边的筷子,最终能保证至少有一位哲学家能够进餐,并在用餐完毕后能释放他占用的筷子,从而使别的哲学家能够进餐;
假设哲学家的编号是A、B、C、D、E,筷子编号是1、2、3、4、5,哲学家和筷子围成一圈如下图所示:
代码实现:
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
pthread_mutex_t chopstick[6];
void *eat_think(void *arg)
{
char phi = *(char*)arg;
int left, right;
int i = 0;
switch(phi)
{
case 'A':
left = 5;
right = 1;
break;
case 'B':
left = 1;
right = 2;
break;
case 'C':
left = 2;
right = 3;
break;
case 'D':
left = 3;
right = 4;
break;
case 'E':
left = 4;
right = 5;
break;
default:
break;
}
while(1)
{
sleep(2);
pthread_mutex_lock(&chopstick[left]); //拿起左边的筷子
printf("哲学家%c拿起筷子%d\n", phi, left);
if(pthread_mutex_trylock(&chopstick[right]) == EBUSY){ //拿起右边的筷子
pthread_mutex_unlock(&chopstick[left]); //右边的筷子被拿走则放下左边的筷子
continue;
}
printf("哲学家%c拿起筷子%d\n", phi, right);
printf("哲学家%c吃饭\n", phi);
sleep(2); //吃饭
pthread_mutex_unlock(&chopstick[left]); //放下左边的筷子
printf("哲学家%c放下筷子%d\n", phi, left);
pthread_mutex_unlock(&chopstick[right]); //放下右边的筷子
printf("哲学家%c放下筷子%d\n", phi, right);
}
}
int main()
{
pthread_t A, B, C, D, E; //五位哲学家
for(int i = 0; i < 5; ++i)
{
pthread_mutex_init(&chopstick[i], NULL);
}
pthread_create(&A, NULL, eat_think, "A");
pthread_create(&B, NULL, eat_think, "B");
pthread_create(&C, NULL, eat_think, "C");
pthread_create(&D, NULL, eat_think, "D");
pthread_create(&E, NULL, eat_think, "E");
pthread_join(A, NULL);
pthread_join(B, NULL);
pthread_join(C, NULL);
pthread_join(D, NULL);
pthread_join(E, NULL);
return 0;
}
为什么要使用线程池?
- 提高程序的执行效率
如果程序中有大量短时间任务的线程任务,由于创建和销毁线程需要和底层操作系统交互,大量时间都耗费在创建和销毁线程上,因而比较浪费时间,系统效率很低,而线程池里的每一个线程任务结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用,因而借助线程池可以提高程序的执行效率 - 控制线程的数量,防止程序崩溃
线程池中线程数量是一定的,可以有效避免出现内存溢出
#include<stdio.h>
#include<pthread.h>
#include<malloc.h>
#include<errno.h>
//要创建任务队列,因此需要定义任务结点结构
typedef struct task{
void *(*run)(void *arg); //任务回调函数
void *arg; //任务回调函数的参数
struct task *next; //下一个任务结点
}task_t;
//封装互斥量和条件变量为一个condition_t
typedef struct
{
pthread_mutex_t mutex; //互斥量
pthread_cond_t cond; //条件变量
}condition_t;
//线程池结构
typedef struct
{
condition_t ready; //同步、互斥条件
task_t *first; //任务队列头结点指针
task_t *last; //任务队列尾结点指针
int count; //当前线程数量
int free; //当前空闲线程数量
int flag; //销毁线程池标志
int max_threads; //线程池中线程总数上限
}threadpool_t;
//互斥量的初始化
void condition_init(condition_t *cond){
pthread_mutex_init(&cond->mutex ,NULL);
pthread_cond_init(&cond->cond ,NULL);
}
void condition_lock(condition_t *cond){
pthread_mutex_lock(&cond->mutex );
}
void condition_unlock(condition_t *cond){
pthread_mutex_unlock(&cond->mutex );
}
int condition_wait(condition_t *cond){
return pthread_cond_wait(&cond->cond ,&cond->mutex );
}
int condition_timedwait(condition_t *cnd,const struct timespec *abstime){
return pthread_cond_timedwait(&cond->cond,&cond->mutex,abstime);
}
void condition_signal(condition_t *cond){
pthread_cond_signal(&cond->cond );
}
void condition_broadcast(condition_t *cond){
pthread_cond_broadcast(&cond->cond );
}
void condition_destroy(condition_t *cond){
pthread_mutex_destroy(&cond->mutex);
pthread_cond_destroy(&cond->cond);
}
//初始化线程函数
void threadpool_init(threadpool_t *threadpool,int max_threads){
condition_init(&(threadpool->ready));
threadpool->first =NULL;//设置任务队列头指针、尾指针
threadpool->last =NULL;
threadpool->count =0;
threadpool->free =0;//初始化当前线程数量和当前空闲线程数量
threadpool->max_threads =max_threads;//设置线程池中线程总数上限
threadpool->flag =0;//将销毁线程池标志设置为0,0为不销毁,1为销毁
}
//线程处理函数
void *route(void *arg)
{
//分离线程
pthread_detach(pthread_self());
//获取线程池结构指针
threadpool_t *threadpool = (threadpool_t *)arg;
//超时标志
int timeout = 0;
while(1)
{
//上锁
condition_lock(&threadpool->ready);
//超时标志重置
timeout = 0;
//当线程创建好,但任务尚未执行时,或者上次任务执行完毕之后再次进入循环,该线程属于空闲线程,所以空闲线程数+1
threadpool->free++;
//当任务队列为空并且还没有开始销毁线程时,线程进入等待状态
while(threadpool->first == NULL && threadpool->flag == 0)
{
//获取系统当前时间
struct timespec wait_time;
clock_gettime(CLOCK_REALTIME, &wait_time);
//超时等待时间为3秒钟, 如果3秒内没有被唤醒(当新任务来时,会发送唤醒信号),则返回。
wait_time.tv_sec += 3;
int ret = condition_timedwait(&threadpool->ready, &wait_time);
if(ret == ETIMEDOUT)
{
printf("%#x thread timeout!\n", (int)pthread_self());
//设置超时标志
timeout = 1;
break;
}
}
//程序能走到这里,有三种情况: 1.等待超时; 2.任务队列不为空 3.被销毁线程池时发送的广播信号唤醒
//而无论哪种情况,空闲线程都应该 - 1
threadpool->free--;
//第一种情况: 等待超时而且当前无任务可执行,则销毁该线程
if(timeout == 1 && threadpool->first == NULL)
{
threadpool->count--;
condition_unlock(&threadpool->ready);
break;
}
//第二种情况: 任务队列不为空, 则执行任务
if(threadpool->first != NULL)
{
//取出队头任务
task_t *task = threadpool->first;
threadpool->first = threadpool->first->next;
//为了避免任务执行时间过长,别的线程获取不到锁,所以先解锁后加锁,绕过任务执行函数
condition_unlock(&threadpool->ready); //解锁
task->run(task->arg); //执行任务
condition_lock(&threadpool->ready); //加锁
//任务执行完毕释放任务结点
free(task);
}
//第三种情况: 下达销毁线程池命令, 且当前无任务可执行,则销毁线程
if(threadpool->flag == 1 && threadpool->first == NULL)
{
threadpool->count--;
//说明是线程池最后一个线程, 应该发送一个唤醒信号让销毁线程池的函数继续往下执行
if(threadpool->count == 0)
{
condition_signal(&threadpool->ready);
}
//解锁
condition_unlock(&threadpool->ready);
break;
}
//这里的解锁主要是为任务执行完毕后的解锁
condition_unlock(&threadpool->ready);
}
}
//向线程池的任务队列中添加任务
void threadpool_add_task(threadpool_t *threadpool, void *(*run)(void *), void *arg)
{
//上锁(对队列进行增删查改需要互斥访问)
condition_lock(&threadpool->ready);
//构造新任务结点
task_t *new_task = malloc(sizeof(task_t));
new_task->run = run;
new_task->arg = arg;
new_task->next = NULL;
//如果任务队列为空
if(threadpool->first == NULL)
{
threadpool->first = new_task;
}
else
{//如果任务队列不为空
threadpool->last->next = new_task;
}
//修正任务队列尾指针
threadpool->last = new_task;
//如果有空闲线程,则说明可能有空闲线程在等待执行任务
if(threadpool->flag > 0)
{
//唤醒一个在等待状态的空闲线程
condition_signal(&threadpool->ready);
}
//没有空闲线程, 而且当前总线程数小于线程池最大线程个数,则要创建一个新线程
else if(threadpool->count < threadpool->max_threads)
{
pthread_t tid;
//创建出新线程并将线程池结构指针作为线程处理函数的参数
pthread_create(&tid, NULL, route, threadpool);
//当前线程池总线程数数加1
threadpool->count ++;
}
//解锁
condition_unlock(&threadpool->ready);
}
//销毁线程池
void threadpool_destroy(threadpool_t *threadpool)
{
//说明有线程已经在处理销毁线程池,其他线程不能进去
if(threadpool->flag == 1)
{
return;
}
//上锁
condition_lock(&threadpool->ready);
threadpool->flag = 1; //标志已经开始销毁线程池
//如果有空闲线程, 则说明有线程处于等待状态,则要将它们全部唤醒,让它们自行销毁
if(threadpool->free > 0)
{
condition_broadcast(&threadpool->ready);
}
else
{//没有空闲线程,则等待所有正在执行任务的线程执行完自己的任务后自行销毁
while(threadpool->count > 0)
{
//阻塞等待所有正在执行任务的线程执行完毕,当最后一个线程执行完毕时,会给发送一个唤醒信号让销毁线程也退出
condition_wait(&threadpool->ready);
}
}
//销毁
condition_destroy(&threadpool->ready);
//解锁
condition_unlock(&threadpool->ready);
}