如何实现生产者消费者问题
1.实验原理:
-
生产者-消费者问题是典型的PV操作问题,假设系统中有一个比较大的缓冲池,生产者的任务是只要缓冲池未满就可以将生产出的产品放入其中,而消费者的任务是只要缓冲池未空就可以从缓冲池中拿走产品。缓冲池被占用时,任何进程都不能访问。
-
每一个生产者都要把自己生产的产品放入缓冲池,每个消费者从缓冲池中取走产品消费。在这种情况下,生产者消费者进程同步,因为只有通过互通消息才知道是否能存入产品或者取走产品。他们之间也存在互斥,即生产者消费者必须互斥访问缓冲池(即不能有两个以上的进程同时进行)。
-
注意:在消费者和生产者执行任务时两者有互斥关系,当缓冲区的任务被消费者取走后消费者线程就阻塞等待生产者生产任务,这里信号处理需要思考。
2.执行过程:
- 附上过程图如下:
3.代码如下:
- 下面利用一个简单的循环机制来实现对生产者或者消费者的阻塞
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<unistd.h>
#define N 50
#define true 1
#define addNUM 10
#define workerNum 5
int queue[N] = {
0}; //任务缓冲区(初始化为0),实现循环队列
int in = 0; //生产的产品的位置
int out = 0; //消费者的位置
int mutex = 1; //互斥条件
int empty = N; //为空的个数
int full = 0; //为满的个数
int addmutex = 1; //生产者之间的互斥条件
int addfoodsN = 0; //生产产品
int wmutex = 1;
void *add(void *a){
while(true){
// while(addmutex <= 0);
// addmutex--;
addfoodsN++;
printf("生产的产品号为%d,于缓冲区的%d位置\n",addfoodsN,in);
while(empty <= 0){
printf("缓冲区已满\n");
sleep(5);
}
empty--;
while(mutex <= 0); //如果mutex <= 0 说明其他线程正在工作,在这里等待
mutex--;
queue[in] = addfoodsN;
in = (in + 1) % N;
mutex++; //解除互斥条件,让等待的线程继续工作
full++; //满了的缓冲区加一
sleep(2); //在此睡眠1秒结束
}
}
void *worker(void *w){
while(true){
// while(wmutex <= 0);
// wmutex--;
while(full <= 0){
//先判断是否缓冲区为空,若为空,则阻塞
printf("缓冲区为空\n");
sleep(5);
}
full--; //否则将缓冲区的物品拿走一个消费
while(mutex <= 0); //当多个消费线程工作时,在此进行阻塞,一个一个进行工作,以防止数据混乱
mutex--; //当第一个线程开始时,在这里改变互斥条件
int foods = queue[out]; //将消费的产品保存起来
queue[out] = 0; //将原来的缓冲区位置归0来重复利用
out = (out + 1) % N;
mutex++;
empty++;
printf("\t\t\t\t 消费的产品ID为%d,缓冲区位置为%d\n",foods,out);
sleep(2);
}
}
int main(void){
pthread_t threadPool[15];
pthread_mutex_t mutex;
for(int i = 0;i < 10; i++){
pthread_t pt;
if(pthread_create(&pt,NULL,add,NULL) == -1){
printf("failed to create a pthread of add%d\n",i);
exit(1);
}
threadPool[i] = pt; //将生产者添加到线程池中
}
for(int i = 0;i < 5; i++){
pthread_t pt;
if(pthread_create(&pt,NULL,worker,NULL) == -1){
printf("failed to create a pthread of worker%d\n",i);
exit(1);
}
threadPool[i+10] = pt; //将消费者添加到线程在池中
}
void *res; //这里定义一个指针来接收pthread_join返回的值(可加可不加)
for(int i = 0;i < 15; i++){
if(pthread_join(threadPool[i],&res) == -1){
printf("failed to recollect\n");
exit(1);
}
}
return 0;
}