生产者消费者
生产者消费者模型的描述: 两个进程共享一个缓冲区,一个进程称为生产者向缓冲区中放数据,另一个称为消费者从缓冲取中取数据,当缓冲区中被放时,生产者进程就必须可进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
基于互斥所和条件变量
要注意的情况:
1.生产者生产时候,消费者不能消费
2.消费者消费时候,生产者不能生产产品
3.队列为空时候,提醒生产者生产产品
4.队列满的时候,提醒消费者消费
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#define MAX 6
pthread_mutex_t mutex;
pthread_cond_t full; //队列是否已满
pthread_cond_t empty; //队列是否已空
int head,tail;
void *producer(void *arg)
{
int i;
for(i = 0; i < MAX * 3; i++)
{
pthread_mutex_lock(&mutex);
while((tail + 1) % MAX == head)
{
printf("仓库已满!producer waiting\n");
pthread_cond_wait(&full, &mutex); //等待消费者消费
}
tail = (tail + 1) % MAX;
printf("tail = %d\n",tail);
pthread_cond_signal(&empty); //发送队列未空消息
sleep(1);
pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg)
{
int i;
for(i = 0; i < MAX * 3; i++)
{
pthread_mutex_lock(&mutex);
while(tail % MAX == head)
{
printf("仓库已空!consumer waiting\n");
pthread_cond_wait(&empty, &mutex); //等待生产者生产
}
head = (head + 1) % MAX;
printf("head = %d\n",head);
pthread_cond_signal(&full); //发送队列未满消息
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t pro,con;
int statu1,statu2;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&full, NULL);
pthread_cond_init(&empty, NULL);
pthread_create(&pro, NULL, producer, NULL);
pthread_create(&con, NULL, consumer, NULL);
pthread_join(pro, (void *) &statu1);
pthread_join(con, (void *) &statu2);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
return 0;
}
哲学家问题
问题: 有五个哲学家绕着圆桌坐,每个哲学家面前有一盘面,两人之间有一支筷子,这样每个哲学家左右各有一支筷子。哲学家有2个状态,思考或者拿起筷子吃饭。如果哲学家拿到一只筷子,不能吃饭,直到拿到2只才能吃饭,并且一次只能拿起身边的一支筷子。一旦拿起便不会放下筷子直到把饭吃完,此时才把这双筷子放回原处。如果,很不幸地,每个哲学家拿起他或她左边的筷子,那么就没有人可以吃到饭了。
哲学家进餐问题是一个多线程运用的经典例子,涉及到线程同步/互斥,临界区访问问题以及死锁问题
我自己设置如果五位哲学家都吃上饭了那么程序就退出,其它的代码中有非常详细的注释。
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <math.h>
/* 如果5位哲学家都吃上饭了就退出 */
pthread_mutex_t chopstick[6]; //定义锁——1只筷子1个锁
typedef struct _data
{
char phi;
int flag;
}DATA;
DATA test[5];
//初始化
void fun()
{
int i = 0;
char ch = 'A';
for(i = 0; i < 5; i++)
{
test[i].phi = ch + i;
test[i].flag = 0;
}
}
//初始化场景——每位哲学家左右为几号筷子
void get(int *left,int *right,char phi)
{
switch (phi)
{
case 'A':
*left = 5;
*right = 1;
break;
case 'B':
*left = 1;
*right = 2;
break;
case 'C':
*left = 2;
*right = 3;
break;
case 'D':
*left = 3;
*right = 4;
break;
case 'E':
*left = 4;
*right = 5;
break;
}
}
void *eat_think(void *arg)
{
char phi = *(char *)arg; //那位哲学家
int left,right;
get(&left,&right,phi);
int i = 0;
for(;;)
{
usleep(10); //将线程挂起,单位为毫秒
pthread_mutex_lock(&chopstick[left]); //将当前哲学家的左边筷子加?
//判断5位哲学家是否都吃上饭
int sum = 0;
for(i = 0; i < 5; i++)
if(test[i].flag >= 1)
sum++;
if(sum >= 5)
{
printf("\e[1;32m%c哲学家吃上饭啦!\e[0m\n",phi);
break;
}
//pthread_mutex_lock(&chopstick[left]); //将当前哲学家的左边筷子加?
printf("Philosopher %c fetches chopstick %d\n", phi,left); //当前哲学家拿到左边筷子
//能吃上饭的前提为拿到两只筷子
//尝试给当前哲学家右边筷子加?
//如果右边筷子已经被加过?则符合if条件——>给左边筷子解?
//如果右边筷子没有被加?则加?
if(pthread_mutex_trylock(&chopstick[right]) == EBUSY)
{
pthread_mutex_unlock(&chopstick[left]);
continue;
}
printf("Philosopher %c fetches chopstick %d\n", phi,right); //当前哲学家拿到右边筷子
printf("\e[1;34mPhilosopher %c is eating.\e[0m\n",phi); //当前哲学家吃上饭了
for(i = 0; i < 5; i++)
{
if(test[i].phi == phi)
{
if(test[i].flag == 1)
break;
test[i].flag++;
}
}
usleep(10);
pthread_mutex_unlock(&chopstick[left]); //给当前哲学家的左边筷子解?
printf("Philosopher %c release chopstick %d\n", phi,left); //当前哲学家放下了左边筷子
pthread_mutex_unlock(&chopstick[right]); //给当前哲学家爱的右边筷子解?
printf("Philosopher %c release chopstick %d\n", phi,right); //当前哲学家放下了右边筷子
}
}
int main()
{
pthread_t A,B,C,D,E; //5个哲学家
int i;
fun();
//初始化锁
for (i = 0; i < 5; i++)
pthread_mutex_init(&chopstick[i],NULL);
//创建5个线程代表5个哲学家
pthread_create(&A, NULL, eat_think, (void *)"A");
pthread_create(&B, NULL, eat_think, (void *)"B");
pthread_create(&C, NULL, eat_think, (void *)"C");
pthread_create(&D, NULL, eat_think, (void *)"D");
pthread_create(&E, NULL, eat_think, (void *)"E");
//等待线程结束
pthread_join(A, NULL);
pthread_join(B, NULL);
pthread_join(C, NULL);
pthread_join(D, NULL);
pthread_join(E, NULL);
return 0;
}
线程池
概念: 线程池是用于实现计算机程序中的执行的并发性的软件设计模式。通常也成为复制工作者或工作人员模型,线程池维护多个线程,等待由监督程序分配任务 以便并发执行。通过维护一个线程池,该模型可以提高性能并避免由于频繁创建和 销毁短期任务的线程而导致的执行延迟。
为什么要使用线程池: 多线程在使用过程中会带来调度上的开销,当线程执行时间远大于线程创建的时间时, 开销可以忽略不计,但当线程执行时间与创建时间处于同一数量级时,线程创建与 切换线程状态,销毁线程这些操作所带来的系统开销往往是影响整个程序的处理速度的罪魁祸首。
流程图:
下面是我实现的一个简易的线程池,基础功能都已实现。
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<sys/types.h>
#define MAX_THREAD_NUM 6
typedef struct work
{
void *(* process)(void *arg); //任务函数指针
void *arg; //参数
struct work *next;
}Work;
//线程池结构
typedef struct threadpoll
{
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t *pthreadid;
Work *tasks; //任务队列
int flag; //标识线程池状态
int max_thread_num; //线程池中允许的最大活动线程
int cur_task; //任务队列当前任务数
}Pool;
void pool_init(); //初始化线程池
int pool_add(void *(* process)(void *arg), void *arg); //添加任务
void *run(void *arg); //工作线程
int pool_destroy(); //销毁线程池
Pool *pool = NULL; //全局变量
void pool_init()
{
pool = (Pool *)malloc(sizeof(Pool));
pthread_mutex_init(&(pool->mutex), NULL);
pthread_cond_init(&(pool->cond), NULL);
//初始化任务队列
pool->tasks = NULL;
pool->pthreadid = (pthread_t *)malloc(sizeof(pthread_t) * MAX_THREAD_NUM);
pool->flag = 0;
pool->max_thread_num = MAX_THREAD_NUM;
pool->cur_task = 0;
int i = 0;
for(i = 0; i < MAX_THREAD_NUM; i++)
pthread_create(&(pool->pthreadid[i]), NULL, run, NULL);
}
int pool_add(void *(* process)(void *arg), void *arg)
{
//创建一个新任务
Work *new_work = (Work *)malloc(sizeof(Work));
new_work->process = process;
new_work->arg = arg;
new_work->next = NULL;
//加锁
pthread_mutex_lock(&(pool->mutex));
//将任务添加到任务队列末尾
Work *temp = pool->tasks;
if(temp)
{
while(temp->next)
temp = temp->next;
temp->next = new_work;
}
else
pool->tasks = new_work;
pool->cur_task++; //任务数+1
//唤醒线程去执行此任务,如果线程都在忙碌,这句就没有作用
pthread_cond_signal(&(pool->cond));
pthread_mutex_unlock(&(pool->mutex));
return 0;
}
void *run(void *arg)
{
printf("start thread %ld\n",pthread_self());
while(1)
{
pthread_mutex_lock(&(pool->mutex));
//若等待队列为0且不销毁线程池则处于阻塞状态
while(pool->cur_task == 0 && !pool->flag)
{
printf("thread %ld is waiting\n",pthread_self());
pthread_cond_wait(&(pool->cond), &(pool->mutex));
}
//销毁线程池
if(pool->flag)
{
pthread_mutex_unlock(&(pool->mutex));
printf("thread %ld will exit\n",pthread_self());
pthread_exit(NULL);
}
printf("thread %ld is starting to work\n",pthread_self());
//等待队列长度-1,取出任务
pool->cur_task--;
Work *temp = pool->tasks;
pool->tasks = pool->tasks->next;
pthread_mutex_unlock(&(pool->mutex));
//执行任务
(*(temp->process))(temp->arg);
free(temp);
temp = NULL;
}
}
int pool_destroy()
{
if(pool->flag)
return -1;
pool->flag = 1;
//唤醒所有线程
pthread_cond_broadcast(&(pool->cond));
//等待所有线程结束
int i = 0;
for(i = 0; i < pool->max_thread_num; i++)
pthread_join(pool->pthreadid[i], NULL);
free(pool->pthreadid);
//销毁等待队列
Work *temp = NULL;
while(pool->tasks)
{
temp = pool->tasks;
pool->tasks = pool->tasks->next;
free(temp);
}
//销毁互斥锁与条件变量
pthread_mutex_destroy(&(pool->mutex));
pthread_cond_destroy(&(pool->cond));
free(pool);
pool = NULL;
return 0;
}
void *process(void *arg)
{
printf("thread %ld is working on task %d\n",pthread_self(), *(int *)arg);
sleep(1);
return NULL;
}
int main()
{
pool_init();
sleep(2);
//连续投入10个任务
int *work_num = (int *)malloc(sizeof(int) * 10);
int i = 0;
for(i = 0; i < 10; i++)
{
work_num[i] = i;
pool_add(process, &work_num[i]);
}
sleep(3);
//销毁线程池
pool_destroy();
free(work_num);
work_num = NULL;
return 0;
}