生产者消费者问题
生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者进程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
单生产者,单消费者
因为有单生产者和单消费者,致使在一个缓冲区满时生产者需要等待消费,一个缓冲区空时消费者也要等待生产。
利用两个pthread库中的互斥锁
先定义初始化,利用静态赋值法,以及定义两个互斥锁;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t thread[2];
pthread_mutex_t mut;
pthread_mutex_t mut2;
先定义全局变量,producer用的time1,consumer用的time2,以及大小为20的缓冲区,以及一个函数t;
#include<stdio.h>
#include<pthread.h>
int buffer[20],t = 0,time1 = 0,time2 = 0;
分为两个函数,一个是生产者的函数;
先判断是否已满,满则加锁;
void producer()
{
while(1)
{
if(time1 == 20)
return;
pthread_mutex_lock(&mut);
判断加锁是否成功,如阻塞则等待,不阻塞则继续;
if(t == 20)
{
printf("缓冲区阻塞,生产者等待。\n");
pthread_mutex_unlock(&mut2);
continue;
}
输出生产者置于多少,增加循环参数,并且解锁。
最后函数为:
void producer()
{
while(1)
{
if(time1 == 20)
return;
pthread_mutex_lock(&mut);
if(t == 20)
{
printf("缓冲区阻塞,生产者等待。\n");
pthread_mutex_unlock(&mut2);
continue;
}
printf("生产者位于%d\n", time1);
t++;
time1++;
pthread_mutex_unlock(&mut2);
}
}
再进行消费者函数;
void consumer()
{
while(1)
{
if(time2 == 20)
return;
pthread_mutex_lock(&mut2);
if(t == 0)
{
printf("缓冲区空了,消费者等待\n");
pthread_mutex_unlock(&mut);
continue;
}
printf("消费者得到%d\n", time2);
t--;
time2;
pthread_mutex_unlock(&mut);
}
}
最后加入主函数;
int main()
{
pthread_create(&thread[0], NULL, (void*)(&producer), NULL);
pthread_create(&thread[1], NULL, (void*)(&consumer), NULL);
sleep(1);
return 0;
}
多生产者,多消费者
先进行头文件和宏定义
//semaphores
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <unistd.h>
#define BUFFER_SIZE 5
定义semaphore记录有多少个线程正在使用
sem_t empty, full, mutex;
定义缓冲区
int buffer[BUFFER_SIZE];
int in, out;
定义存储数据的结构体
struct data {
int id;
int opTime;
int lastTime;
int productId;
};
定义有限输入
int insert_item(buffer_item item)
{
buffer[out] = item;
out = (out + 1) % BUFFER_SIZE;
return 0;
}
定义有限删除
int remove_item(buffer_item *item)
{
*item = buffer[in];
in = (in + 1) % BUFFER_SIZE;
return 0;
}
接下来是生产者部分的函数
先定义生产者id,始末次数,总id
void *producer(void* param) {
int productId = ((struct data*)param)->productId;
int lastTime = ((struct data*)param)->lastTime;
int opTime = ((struct data*)param)->opTime;
int id = ((struct data*)param)->id;
然后在是释放空间,并为信号量减1
free(param);
sleep(opTime);
sem_wait(&empty);
sem_wait(&mutex);
再做临界段的处理,并为信号量加1
insert_item(productId);
sleep(lastTime);
printf("Thread %d: Producer produce %d\n", id, productId);
sem_post(&mutex);
sem_post(&full);
pthread_exit(0);
}
然后是消费者的函数
void *consumer(void* param) {
int lastTime = (
(struct data*)param)->lastTime;
int opTime = ((struct data*)param)->opTime;
int id = ((struct data*)param)->id;
free(param);
sleep(opTime);
sem_wait(&full);
sem_wait(&mutex);
buffer_item item;
remove_item(&item);
sleep(lastTime);
printf("Thread %d: Consumer consume %d\n", id, item);
sem_post(&mutex);
sem_post(&empty);
pthread_exit(0);
}
最后进行主函数
定义线程标识符,线成属性
int main()
{
pthread_t tid;
pthread_attr_t attr;
再对线程和信号量初始化
pthread_attr_init(&attr);
sem_init(&mutex, 0, 1);
sem_init(&empty, 0, BUFFER_SIZE);
sem_init(&full, 0, 0);
in = out = 0;
再定义生产者和消费者的运行时间和id及创建线程,并运行函数
int id = 0;
while(scanf("%d", &id) != EOF)
{
char role;
id scanf("%c%d%d", &role, &opTime, &lastTime);
struct data* d = (struct data*)malloc(sizeof(struct data));
d->id = id; d->opTime = opTime;
d->lastTime = lastTime;
if(role == 'P')
{
scanf("%d", &productId);
d->productId = productId;
pthread_create(&tid, &attr, producer, d);
}
else if(role == 'C')
pthread_create(&tid, &attr, consumer, d); }
最后释放信号量
sem_destroy(&mutex);
sem_destroy(&empty);
sem_destroy(&full);
return 0;
}sem_destroy(&mutex);
sem_destroy(&empty);
sem_destroy(&full);
return 0;
}