C语言实战——生产者消费者问题
方法摘要
生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者进程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
简写代码
- 对于生产者:
void producer(void)
{
int item;
while(1)
{
item = produce_item();
if(count == N) //如果缓冲区满就休眠
sleep();
insert_item(item);
count = count + 1; //缓冲区数据项计数加1
if(count == 1)
wakeup(consumer);
}
}
- 这里的函数名只是简写,并没有按照要求,只是把其功能简单表述。
- 对于消费者:
void consumer(void)
{
int item;
while(TRUE)
{
if(count == 0) //如果缓冲区空就休眠
sleep();
item = remove_item();
count = count - 1; //缓冲区数据项计数减1
if(count == N - 1)
wakeup(producer);
consume_item(item);
}
}
- 然而在特殊的情况下,即进程切换时,生产者唤醒消费者的新号可能会丢失,这样会导致生产者和消费者迟早都会休眠,导致死锁。
信号量的引入
- down和up(分别为一般化后的sleep和wakeup)。对一个信号量执行down操作,则是检查其值是否大于0。若该值大于0,则将其减1(即用掉一个保存的唤醒信号)并继续;若该值为0,则进程将睡眠,而且此时down操作并未结束。检查数值、修改变量值以及可能发生的睡眠操作均作为一个单一的、不可分割的原子操作完成。保证一旦一个信号量操作开始,则在该操作完成或阻塞之前,其他进程均不允许访问该信号量。这种原子性对于解决同步问题和避免竞争条件是绝对必要的。所谓原子操作,是指一组相关联的操作要么都不间断地执行,要么不执行。
- up操作对信号量的值增1。如果一个或多个进程在该信号量上睡眠,无法完成一个先前的down操作,则由系统选择其中的一个(如随机挑选)并允许该进程完成它的down操作。于是,对一个有进程在其上睡眠的信号量执行一次up操作后,该信号量的值仍旧是0,但在其上睡眠的进程却少了一个。信号量的值增加1和唤醒一个进程同样也是不可分割的,不会有某个进程因执行up而阻塞,正如前面的模型中不会有进程因执行wakeup而阻塞一样。
- P,V操作的含义:
P(S)
表示申请一个资源,S.value>0
表示有资源可用,其值为资源的数目;S.value=0
表示无资源可用;S.value<0
则他的大小表示S等待队列中的进程个数。V(S)
表示释放一个资源,信号量的初值应大于等于0。P操作相当于“等待一个信号”,V操作相当于“发送一个新号”,在实现互斥过程中,V操作相当于发送一个信号说临界资源可用了。实际上,在实现互斥时,P、V操作相当于申请资源和释放资源。 - 该解决方案使用了三个信号量:一个称为full,用来记录充满缓冲槽数目,一个称为empty,记录空的缓冲槽总数;一个称为mutex,用来确保生产者和消费者不会同时访问缓冲区。full的初值为0,empty的初值为缓冲区中槽的个数,mutex的初值是1。供两个或多个进程使用的信号量,其初值为1,保证同时只有一个进程可以进入临界区,称为二元信号量。如果每个进程在进入临界区前都执行了down操作,并在退出时执行一个up操作,就能够实现互斥。
#define N 100
typedef int semaphore;
semaphore mutex = 1;
semaphore empty = N;
semaphore full = 0;
void producer(void)
{
int item;
while(TRUE)
{
item = produce_item();
down(&empty); //空槽数目减1,相当于P(empty)
down(&mutex); //进入临界区,相当于P(mutex)
insert_item(item); //将新数据放到缓冲区中
up(&mutex); //离开临界区,相当于V(mutex)
up(&full); //满槽数目加1,相当于V(full)
}
}
void consumer(void)
{
int item;
while(TRUE)
{
down(&full); //将满槽数目减1,相当于P(full)
down(&mutex); //进入临界区,相当于P(mutex)
item = remove_item(); //从缓冲区中取出数据
up(&mutex); //离开临界区,相当于V(mutex)
up(&empty); //将空槽数目加1 ,相当于V(empty)
consume_item(item); //处理取出的数据项
}
}
- 信号量的另一种用途是用于实现同步,信号量full和empty用来保证某种事件的顺序发生或不发生。在本例中,它们保证当缓冲区满的时候生产者停止运行,以及当缓冲区空的时候消费者停止运行。
代码实现
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#define N 100
#define true 1
#define producerNum 5
#define consumerNum 5
#define sleepTime 5
typedef int semaphore;
typedef int item;
item buffer[N] = {0};
int in = 0;
int out = 0;
int proCount = 0;
semaphore mutex = 1, empty = N, full = 0, proCmutex = 1;
void * producer(void * a)
{
while(1)
{
while(proCmutex <= 0);
proCmutex--;
proCount++;
printf("生产一个产品ID%d,缓冲区位置为%d\n",proCount,in);
proCmutex++;
while(empty <= 0)
{
printf("缓冲区已满!\n");
}
empty--;
while(mutex <= 0);
mutex--;
buffer[in] = proCount;
in = (in + 1) % N;
mutex++;
full++;
sleep(sleepTime);
}
}
void * consumer(void *b)
{
while(1)
{
while(full <= 0)
{
printf("缓冲区为空!\n");
}
full--;
while(mutex <= 0);
mutex--;
int nextc = buffer[out];
buffer[out] = 0;//消费完将缓冲区设置为0
out = (out + 1) % N;
mutex++;
empty++;
printf(" 消费一个产品ID%d,缓冲区位置为%d\n", nextc,out);
sleep(sleepTime);
}
}
int main()
{
pthread_t threadPool[producerNum+consumerNum];
int i;
for(i = 0; i < producerNum; i++){
pthread_t temp;
if(pthread_create(&temp, NULL, producer, NULL) == -1){
printf("ERROR, fail to create producer%d\n", i);
exit(1);
}
threadPool[i] = temp;
}//创建生产者进程放入线程池
for(i = 0; i < consumerNum; i++){
pthread_t temp;
if(pthread_create(&temp, NULL, consumer, NULL) == -1){
printf("ERROR, fail to create consumer%d\n", i);
exit(1);
}
threadPool[i+producerNum] = temp;
}//创建消费者进程放入线程池
void * result;
for(i = 0; i < producerNum+consumerNum; i++){
if(pthread_join(threadPool[i], &result) == -1){
printf("fail to recollect\n");
exit(1);
}
}//运行线程池
return 0;
}