Description
生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者进程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
分析
- 首先介绍我相对熟悉的sem写法:
用数组并对其下标进行简单的%操作来实现环形数组,用于生产/消费产品。
该写法的核心是定义两个sem,一个用来表示剩余可容纳的产品数(有产品最大值),另一个用于表示当前拥有的产品数。如果产品过多,表示剩余可容纳产品数的信号量则为0,生产者无法再做wait操作,该线程便会阻塞,同理,如果没有再可供消费的产品,表示当前拥有产品数的信号量为0,消费者无法再做wait操作,等待生产者生产后接触阻塞。 - 其次是mutex写法:
本写法采用链表来模拟产品仓库。定义mutex,以及条件变量cond。条件变量cond的意义在于,若消费者发现仓库为空,可以阻塞等待生产者生产(也就是说,生产者每次生产时都要告知阻塞在该条件变量上的线程,你可以接触阻塞了)。
此外,要注意的是:若涉及到多个生产者和消费者时,判断仓库是否为空时需要用while语句,因为broadcast之后如有多个消费者阻塞,却只有一个数据,会发生段错误。
代码
使用mutex:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/types.h>
#include <pthread.h>
#include <string.h>
struct message
{
int num;
struct message *next;
};
void pthread_err(int ret, const char *str)
{
fprintf(stderr, "%s error : %s\n", str, strerror(ret));
exit(1);
}
struct message *head = NULL;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *consumer(void *arg)
{
while (1)
{
struct message *temp;
pthread_mutex_lock(&mutex);
if (head == NULL) //多个消费者时用while,因为broadcast之后如有多个消费者阻塞,却只有一个数据,会发生段错误
{
//阻塞等待条件变量,并解锁(返回时重新加锁)
pthread_cond_wait(&has_product, &mutex);
}
//模拟消费数据
temp = head;
head = head->next;
pthread_mutex_unlock(&mutex);
printf("-----Consume %d\n", temp->num);
free(temp);
sleep(rand() % 3);
}
return NULL;
}
void *producer(void *arg)
{
while (1)
{
//模拟生产数据
struct message *product = (struct message*)malloc(sizeof(struct message));
product->num = rand()%1000+1;
printf("=====Produce %d\n", product->num);
//加锁(锁的粒度越小越好,提高并发性)
pthread_mutex_lock(&mutex);
//头插法
product->next = head;
head = product;
//解锁
pthread_mutex_unlock(&mutex);
//唤醒阻塞在has_product上的线程
pthread_cond_signal(&has_product);//多个消费者:phread_cond_broadcast(&has_product);
sleep(rand() % 3);
}
return NULL;
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
srand( (unsigned int)time(NULL));
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
return 0;
}
使用sem:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>
#define NUM 5
int queue[NUM]; //全局数组实现环形队列
sem_t blank_number, product_number; //空格信号量和产品信号量
void my_err(const char *str)
{
perror(str);
exit(1);
}
void *producer(void *arg)
{
int i = 0;
while (1)
{
sem_wait(&blank_number); //生产者将空格子数--,为0则阻塞等待
queue[i] = rand()%1000 + 1;
printf("=====Product %d\n", queue[i]);
sem_post(&product_number); //产品数++
i = (i+1)%NUM;
sleep(rand() % 1); //方便另一个获得CPU
}
}
void *consumer(void *arg)
{
int i = 0;
while (1)
{
sem_wait(&product_number);
printf("-----Consume %d\n", queue[i]);
queue[i] = 0;
sem_post(&blank_number);
i = (i+1)%NUM;
sleep(rand() % 3);
}
}
int main(int argc, char *argv[])
{
pthread_t pid, cid;
sem_init(&blank_number, 0, NUM); //0表示在线程间
sem_init(&product_number, 0, 0); //0表示在线程间
pthread_create(&pid, NULL, producer, NULL);
pthread_create(&cid, NULL, consumer, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
sem_destroy(&blank_number);
sem_destroy(&product_number);
return 0;
}