我们的生产者和消费者,传输数据只是第一步
- 数据怎么来的,耗时吗
- 数据怎么处理,耗时吗
我们还需要对获得的数据添加一个场景 即对他就行任务处理
如生产者放进去了一个任务(x,y±* /)
消费者拿到了这个任务,就要对这些任务进行处理
BlockQueue.hpp
//.hpp 是 c++里面 在开源软件里面使用,声明和定义都可以放在一个文件里面
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include<unistd.h>
namespace ns_blockqueue//使用一个自己的命名空间
{
const int default_cap = 5;
template <class T>//定义一个类模板
class BlockQueue
{
private:
std::queue<T> _bq; //队列,使用了一个类模板
int _cap; //队列的元素上限
pthread_mutex_t _mutex; //保证数据安全
//当生产满了的时候,就不要生产了,不要生产锁了,让消费者来消费
//当消费空了,就不要消费了,让生产了
//这就要有两个条件变量
pthread_cond_t _full; //_bq满的,就要让消费者来消费,空了就要在这个条件变量等待
pthread_cond_t _empty; //_bq空了,要让生产者来生产,生产者在该条件变量进行等待
private:
bool IsFull()
{
return _bq.size() == _cap;
}
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnLockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void WaitProduct()
{
//因为条件变量和互斥锁都搭配使用,我们进入临界资源,我们就是持有锁的
// 1.调用的时候,会首先自动释放锁资源
// 2.然后再挂起自己,这样别人就可以去申请锁
// 3.返回的时候,会首先竞争锁,获取到锁之后,才能返回
pthread_cond_wait(&_full, &_mutex);
}
bool IsEmpty()
{
return _bq.empty();
}
void WaitConsume()
{
//如果一直抱着锁被挂起的话,就会被永远挂起,死锁
pthread_cond_wait(&_empty, &_mutex);
}
void WakeupConsumer()
{
pthread_cond_signal(&_empty);
}
void WakeupProducer()
{
pthread_cond_signal(&_full);
}
public:
BlockQueue(const int cap = default_cap) //带一个缺省参数
: _cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
// const & 输入型参数
//* 输出型参数
//& 输入输出型参数
//只有消费者知道,生产者什么时候可以生产
void Push(const T &in) //向我们的队列中放数据
{
//在访问临界资源的时候,就应该把数据锁住
LockQueue();
//因为生产者消费者之间都是看同一个队列,所以这一把锁就已经够用了
//临界区
// if (IsFull())
//我们需要进行条件检测的时候,这里需要使用循环的方式
//来保证退出循环一定是条件不满足导致的
while (IsFull())
{
//等待,把线程挂起,我们当前是持有锁的,
//如果队列是空的话就不应该生产了,而是在那里等待
//1. 如果我挂起失败呢,因为函数调用有成功有失败
//函数调用失败
//2. 如果我被伪唤醒呢(条件还没有就绪)
//如果是多核多CPU的,很多线程都在条件变量下等待
WaitProduct();
//我醒来之后要再进行一次判断,判断是否为满,判断成功就往下走,
}
//用if判断,有可能当前队列还是满的,再向下走的话,就会插入一个不应该插入的数据
//生产函数
_bq.push(in);
//唤醒消费者,这里我们制定的策略是,有一个就唤醒一个
if(_bq.size()>_cap/2)//改变策略
WakeupConsumer();
UnLockQueue(); //解锁
}
//只有生产者知道,消费者什么时候可以生产
void Pop(T *out) //向队列里面拿数据
{
LockQueue();
// if(IsEmpty())
while(IsEmpty())
{
//无法消费
WaitConsume();
}
//消费函数
*out = _bq.front(); //拿到队头的元素
_bq.pop(); //删除数据
if(_bq.size()<_cap/2)
WakeupProducer();//唤醒生产者,这里的wakeup放在里面和外面都是可以的
UnLockQueue();
}
};
}
Task.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op; //表示+-*/%
public:
Task() //无参构造,为了拿任务,不需要参数列表
{
}
//进行函数重载
Task(int x, int y, char op) //有参构造,制造任务
: _x(x), _y(y), _op(op)
{
}
~Task()
{
}
int Run()
{
int res = 0;
switch (_op)
{
case '+':
res = _x + _y;
break;
case '-':
res = _x - _y;
break;
case '*':
res = _x * _y;
break;
case '/':
res = _x / _y;
break;
case '%':
res = _x % _y;
break;
default:
std::cout << "bug?" << std::endl;
break;
}
std::cout << "当前任务正在被:" << pthread_self() << "处理:" << _x << _op << _y << "=" << res << std::endl;
return res;
}
Task operator=(Task &s)
{
if (this != &s)
{
_x = s._x;
_y = s._y;
_op = s._op;
}
return *this;
}
int operator()()//重载一个函数
{
return Run();
}
};
}
PC.cpp
#include"BlockQueue.hpp"
#include"Task.hpp"
using namespace ns_blockqueue;
#include<cstdlib>
#include<ctime>
using namespace ns_task;
void* consumer(void* args)
{
//两个角色已经是有了
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;//拿到了了阻塞队列
while(true)
{
// sleep(2);
// int data=0;//我们想把数据拿出来
// bq->Pop(&data);
// //data就是一个输出型参数
// std::cout<<"消费者消费了一个数据:"<<data<<std::endl;
//消费者这里需要获得任务,无参构造就行了
Task t;
bq->Pop(&t);//任务拿出来了,消费的第一步
//拿出来之后我们就要对任务进行处理
// t.Run();
t();//拿到任务,直接返回,消费的第二步
}
}
void* producer(void* args)
{
BlockQueue<Task>* bq=(BlockQueue<Task>*)args;//拿到了了阻塞队列
std::string ops="+-*/%";
while(true)
{
// sleep(2);//按照生产者的节奏来走
//我生产一条他消费一条
//制造数据
//生产者的数据(task)哪里来呢
// int data=rand()%20+1;
// std::cout<<"生产者生产数据:"<<data<<std::endl;
// bq->Push(data);
//1. 生产一个任务
sleep(1);
int x=rand()%20+1;
int y=rand()%10+1;
char op=ops[rand()%5];//0-4之间,进行操作运算
Task t(x,y,op);
std::cout<<"生产派发了一个任务"<<x<<op<<y<<"="<<"?"<<std::endl;
//2.把数据放到队列里面
bq->Push(t);//把任务塞进去了
}
}
int main()
{
srand((long long)time(nullptr));//种了一个随机数种子
BlockQueue<Task>* bq=new BlockQueue<Task>(6);//我们动态开辟的一个空间,因为引入了模板,所以这里我们对他进行实例化一下顺便初始化一下
pthread_t c,p;
pthread_t c1,c2,c3,c4;
pthread_create(&c,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c1,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c2,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c3,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&c4,nullptr,consumer,(void*)bq);//把阻塞队列传进去就可以看到同一个阻塞队列了
pthread_create(&p,nullptr,producer,(void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}