前言
对于C++的学习,感觉如果只看书,学习效率很低。很多新知识新概念理解起来都很困难,而C++11更是引入了更多新的概念和知识。而在学习服务端这部分,什么“同步–异步”也把人搞得很晕
如果看不懂书,看不懂概念,不如就找个例子分析一下,起码能让你快速上手。
想起了老陈给我们上《操作系统》时讲的要把理论和实例化的东西相结合,对于我这种笨笨的娃尤其适宜,只有理论实在看不懂ToT。
本文主要通过一个实例(半同步–半异步线程池)的源码分析,体会服务端中的并发模式,同时,由于代码几乎完全使用C++11开发,也会针对C++11的新概念进行讲解。
所以本文适合于希望学习C++11/服务端开发的小伙伴。
这个例子出自《深入应用C++11—代码优化与工程级应用》的第9章。源代码已经开源在作者的github上。 原代码地址
正文
同步队列模板类SyncQueue
让我们先看一下私有成员变量:
private:
std::list<T> m_queue; //使用std::list来保存任务
std::mutex m_mutex; //C++11的互斥锁 用于线程同步
std::condition_variable m_notEmpty; //C++11的条件变量,用于保证线程执行的次序
std::condition_variable m_notFull; //同上
int m_maxsize; //队列中允许保存的最大任务数
bool m_needStop; //用于终止队列的标志,默认为false
};
构造函数
//构造函数:这里实现的较为简单,主要是记录了队列允许的最大任务数,同时将终止队列的标志m_needStop设置为false
SyncQueue(int maxsize) : m_maxsize(maxsize), m_needStop(false) {}
让我们先来看最简单的3个public方法,主要使用互斥量std::mutex 来保证对队列访问的互斥
//这三个public方法涉及到队列的计数(size方法),因此在调用时要加锁
//这里使用std::lock_guard()来更加简便
bool Empty() //判断是否为空
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full() //判断是否为满
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxsize;
}
size_t Size() //得到当前的任务数量
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
std::mutex的一般用法如下:
#include <mutex>
std::mutex m;
void fun(){
m.lock();
//临界区
m.unlock();
}
这样直接加锁解锁很直观,但是需要我们手动unlock,有时候忘了unlock就尴尬了。所以我们使用简单的std::lock_guard locker(m_mutex) 这个locker在构造时加锁,在析构时解锁,所以我们可以保证这三个函数互斥访问临界区。
两个主要的操作Take & Put
其实Take和Put的流程基本相同,主要使用了条件变量condition_variable来控制线程的执行顺序,保证队列空时Put线程运行,队列满时Take线程运行。
条件变量的使用需要配合锁,并且必须是std::unique_lock
//从任务队列中获取任务,重载的Take主要为了实现批量取任务
//如果参数为std::list<T> 则直接将所有的任务都取出(放到这个list里)
//否则只取出一个
void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); // lambda表达式捕获了this指针则可以直接调用private 方法
//如果队列不为空则往下执行,否则阻塞
//如果m_needStop标志为true 则直接结束
if(m_needStop)
return;
list = std::move(m_queue);//通过move移动语义,减少一次复制
m_notFull.notify_one(); //唤醒一个等待Put的线程
}
//重载的Take方法执行流程一致,只是取出队列中最前面的任务
void Take(T& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty();});
if(m_needStop)
return ;
x = m_queue.front();
m_queue.pop_front(); //取出任务
m_notFull.notify_one();
}
条件变量的使用方式为
std::mutex m_mutex;
std::condition_varible m_cv;
void func(){
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{return isTrue();});
//临界区
m_cv.notify_one();
}
即首先线程竞争获得锁locker,注意必须使用std::unique_lock
然后调用wait方法.这里使用了wait方法的重载版本,首先判断谓词是否为true,若是直接继续.否则,线程阻塞,同时wait会将locker解锁(让其它线程能拿到锁),等待别的线程通过m_cv的notify_*方法将其唤醒.
一旦唤醒,则继续拿锁判断谓词.
wait还有一种版本,不需谓词,直接阻塞解锁,等待notify_*方法唤醒,唤醒后拿锁继续执行.
可以看到wait方法有自动解锁拿锁的过程,因此必须使用std::unique_lock .
//向任务队列中添加任务,使用私有方法Add
//这里Put是采用Add实现的,之所以要这样做就是为了覆盖三种情况(常量左值,非常量左值,右值)
void Put(const T &x)// 参数为常量左值
{
Add(x); //直接传递
}
void Put(T &&x) //参数为左值或者右值
{
Add(std::forward<T>(x)); //使用完美转发,保存参数的类型
}
private:
template <typename F>
void Add(F &&x)
{
//流程同理,通过条件变量保证访问顺序
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this]{return m_needStop || NotFull();});
if(m_needStop)
return;
m_queue.push_back(std::forward<F>(x));//再次完美转发放入队列
m_notEmpty.notify_one();
}
终止操作
//停止所有任务
void Stop()
{
//这里依然是要对队列操作,所以我们依然要加锁
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;//设置标志
}
// 这里线程放下了锁(因为一个代码块结束,locker就已经析构,放下了锁)
//这个代码块是为了下面唤醒线程之后,线程能立刻拿到锁退出.
m_notFull.notify_all(); //唤醒正在等待Take和Put的线程,然后所有线程依此拿锁return
m_notEmpty.notify_all();
}
判断队列是否为空/满
private:
//这两个方法由于调用时线程已经拿到锁,所以不用再在计数前加锁了
bool NotFull() const //指定对象为const 防止修改
{
bool full = m_queue.size() >= m_maxsize;
if(full)
std::cout << "buffer is full ...please wait" << std::endl;
else
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if(empty){
std::cout << "buffer is empty... please wait. Thread ID:" << std::this_thread::get_id() << std::endl;
}
return !empty;
}