多线程基本知识以及生产者消费者问题
文章目录
生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者线程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者线程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
锁mutex
使用mutex的基本步骤
- pthread_mutex_t lock;创建锁
- pthread_mutex_init;初始化
- pthread_mutex_lock;加锁
- 访问共享数据(stdout)
- pthread_mutex_unlock();解锁
- pthread_mutex_destroy;销毁锁
注意事项:
尽量保证锁的粒度,越小越好(访问共享数据时,加锁,访问结束立即解锁)
restrict关键字:
用来限定指针变量,被限定的的指针变量所指向的内存操作,必须有本指针完成
使用条件变量实现生产者消费者模型
消费者:
1.创建锁pthread_mutex_t mutex;
2.初始化pthread_mutex_init(&mutex,NULL);
3.加锁pthread_mutex_lock(&mutex);
4.等待条件满足pthread_cond_wait(&cond,&mutex);
1)阻塞等条件变量
2)解锁
3)加锁lock(mutex)
5.访问共享数据
6.解锁 释放条件变量 释放锁
生产者:
1.生产数据
2.加锁pthread_mutex_lock(&mutex)
3.将数据放置公共区域
4.解锁pthread_mutex_unlock(&mutex)
5.通知阻塞在条件变量上的线程
pthread_cond_signal()
pthread_cond_broadcast()
6.循环生产后续数据
条件变量实现生产者消费者模型代码实现
// Single-producer , single-consumer Queue
//借助条件变量模拟生产者消费者问题
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
int i=0;//生产者序号
int j=0;//消费者序号
#define SIZE 10//最大容纳量
// 链表作为共享数据,需被互斥量保护
struct SPSCQueue {
int num;
struct SPSCQueue *next;
int cnt;
} typedef SPSCQueue;
struct SPSCQueue *head;//头指针
//静态初始化一个条件变量和一个互斥量
pthread_cond_t has_product=PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
//生产者
void *producer(void *arg)
{
SPSCQueue *temp;
for(;;)
{
temp=(SPSCQueue)malloc(sizeof(SPSCQueue));
temp->num=rand()%1000+1;//模拟生产一个产品
head->cnt++;
printf("-第%d个Produce %lu------------------------------%d\n",++i,pthread_self(),temp->num);//输出提示信息
pthread_mutex_lock(&lock);//加锁
if(temp->cnt==SIZE){
//队列已满
pthread_cond_wait(&has_product,&lock);//生产者阻塞等待,pthread_cond_wait返回时,重写加锁mutex
}
SPSCQueue *p;
p=head->next;
head->next=temp;
temp->next=p;//头插法入队列
pthread_mutex_unlock(&lock);//解锁
pthread_cond_signal(&has_product);//将等待在该条件变量上的一个线程唤醒
sleep(rand()%5);
}
return NULL;
}
//消费者
void *consumer(void *arg)
{
SPSCQueue *temp;
for(;;)
{
pthread_mutex_lock(&lock);//消费者加锁
//这里是单线程,不需要循环判断
if(head->next==NULL)//多线程阻塞时,可能会出现线程被唤醒但队列为空的情况,需要循环判断
{
pthread_cond_wait(&has_product,&lock);//消费者阻塞等待,pthread_cond_wait返回时,重写加锁mutex
}
temp=head;
while(temp->next->next!=NULL)//!找到尾节点上一个节点
{
temp=temp->next;
}
printf("-第%d个Cosumer %lu-----------------------------%d\n",++j,pthread_self(),temp->next->num);//输出提示信息
head->cnt--;
pthread_mutex_unlock(&lock);//解锁
pthread_cond_signal(&has_product);//唤醒线程
free(temp->next);
temp->next=NULL;//这一步很重要,否则segmentation fault
sleep(rand()%5);
}
return NULL;
}
int main(int argc,char *argv[])
{
head=(SPSCQueue *)malloc(sizeof(SPSCQueue));
head->next=NULL;
head->cnt=0;
pthread_t pid,cid;
srand(time(NULL));
int ret=pthread_create(&pid,NULL,producer,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_create error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_create(&cid,NULL,consumer,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_create error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_join(pid,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_join error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_join(cid,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_join error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_mutex_destroy(&lock);
if(ret!=0)
{
fprintf(stderr,"pthread_mutex_destroy error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_cond_destroy(&has_product);
if(ret!=0)
{
fprintf(stderr,"pthread_cond_destroy error:%s\n",strerror(ret));
exit(1);
}
return 0;
}
信号量
信号量:
相当于初始值为N的互斥量,N值,表示可以同时访问共享数据区的线程数
函数:
- sem_t sem 定义类型
- int sem_init(sem_t *sem,int pshared,unsigned int value);
参数:
- sem:信号量
- pshared:
- 0:用于线程间同步
- 1:用于进程间同步
- value:N值(指定同时访问的线程数)
int sem_destroy(sem_t *sem);
sem_wait();一次调用做一次–操作,当信号量为0时,再次–就会阻塞
sem_post();一次调用做一次++操作,当信号量为N时,再次++阻塞
使用信号量实现生产者消费者模型
//利用信号量实现单生产者消费者模型
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#include<string.h>
#include<semaphore.h>
#define NUM 5
int i=0;
int j=0;
struct SPSCQueue {
int num;
struct SPSCQueue *next;
int cnt;
} typedef SPSCQueue;
struct SPSCQueue *head;//头指针
sem_t blank_number,product_number;//空格子信号量,产品信号量
//生产者
void *producer(void *arg)
{
SPSCQueue *temp;
for(;;)
{
sem_wait(&blank_number);//生产者将空格数--,为0阻塞等待
temp=malloc(sizeof(SPSCQueue));
temp->num=rand()%1000+1;//模拟生产一个产品
head->cnt++;
printf("-第%d个Produce %lu------------------------------%d\n",++i,pthread_self(),temp->num);//输出提示信息
//pthread_mutex_lock(&lock);//加锁
SPSCQueue *p;
p=head->next;
head->next=temp;
temp->next=p;//头插法入队列
sem_post(&product_number);//将产品数++
sleep(rand()%5);
}
return NULL;
}
//消费者
void *consumer(void *arg)
{
SPSCQueue *temp;
for(;;)
{
sem_wait(&product_number);//消费者将产品数--,为0则阻塞等待
temp=head;
while(temp->next->next!=NULL)//!找到尾节点上一个节点
{
temp=temp->next;
}
printf("-第%d个Cosumer %lu-----------------------------%d\n",++j,pthread_self(),temp->next->num);//输出提示信息
head->cnt--;
sem_post(&blank_number);//消费掉以后,将空格数++
free(temp->next);
temp->next=NULL;//这一步很重要,否则segmentation fault
sleep(rand()%5);
}
return NULL;
}
int main(int argc,char *argv[])
{
head=(SPSCQueue *)malloc(sizeof(SPSCQueue));
head->next=NULL;
head->cnt=0;
pthread_t pid,cid;
srand(time(NULL));
int ret=sem_init(&blank_number,0,NUM);//初始化空格信号量为5,线程间共享--0
if(ret!=0)
{
fprintf(stderr,"pthread_create error:%s\n",strerror(ret));
exit(1);
}
ret=sem_init(&product_number,0,0);//产品数为0
if(ret!=0)
{
fprintf(stderr,"pthread_create error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_create(&pid,NULL,producer,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_create error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_create(&cid,NULL,consumer,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_create error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_join(pid,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_join error:%s\n",strerror(ret));
exit(1);
}
ret=pthread_join(cid,NULL);
if(ret!=0)
{
fprintf(stderr,"pthread_join error:%s\n",strerror(ret));
exit(1);
}
ret=sem_destroy(&blank_number);
if(ret!=0)
{
fprintf(stderr,"sem_destroy error:%s\n",strerror(ret));
exit(1);
}
ret=sem_destroy(&product_number);
if(ret!=0)
{
fprintf(stderr,"sem_destroy error:%s\n",strerror(ret));
exit(1);
}
return 0;
}
多线程的生产者消费者只需要将条件判断里的if改为while即可!