半同步半异步线程池介绍
为了解决任务较多时产生的线程因不断创建销毁而消耗过多系统资源的问题,而实现了线程池技术。
大概思路:
维护一个线程池,里面放有一定数量的线程。当任务到来时选出一个任务“接活”,任务完成后再将其放入池中。
半同步半异步的三层:
- 同步服务层:处理来自上层的任务请求,负责将它们放到同步队列里。
- 同步排队层:来自上层的任务请求都会加到排队层中等待被第三层处理。但必须对大小加以控制,不然如果同步服务层不断添加任务过来,而异步服务层处理不及时,会导致任务过多,内存暴涨等问题。
- 异步服务层:多个线程等待从同步队列取出任务执行。
功能解析图:
代码实现:
同步队列
#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
using namespace std;
template<typename T>
class SyncQueue
{
public:
/*构造函数*/
SyncQueue( int maxSize ) : m_maxSize( maxSize ), m_needStop( false )
{
}
/*添加事件,左值拷贝和右值引用*/
void Put( const T&x )
{
/*调用private内部接口Add*/
Add( x );
}
void Put( T &&x )
{
Add( std::forward<T>(x) );
}
/*从队列中取事件*/
void Take( T &t)
{
std::unique_lock<std::mutex> locker(m_mutex);
//满足条件则唤醒
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if( m_needStop )
return ;
t = m_queue.front();
m_queue.pop_front(); //取出任务
m_notFull.notify_one(); //唤醒阻塞的线程
}
/*取一个线程*/
void Take( std::list<T> &list )
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait( locker, [this]{return m_needStop || NotEmpty(); } );
if(m_needStop)
return ;
list = std::move(m_queue);
m_notFull.notify_one();
}
/*停止所有线程在同步队列中的读取*/
void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size == m_maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
int Count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if( full )
{
cout << "缓冲区满了,需要等待..." << endl;
}
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if( empty )
{
cout << "缓冲区空了,需要等待...异步层线程ID:" << std::this_thread::get_id() << endl;
}
return !empty;
}
template<typename F>
void Add( F &&x )
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait( locker, [this]{return m_needStop || NotFull(); } ); //
if( m_needStop )
{
return ;
}
m_queue.push_back(std::forward<F>(x)); //条件满足,像队列添加事件
m_notEmpty.notify_one(); //唤等待的工作线程
}
private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合使用
std::condition_variable m_notEmpty; //不为空的条件变量
std::condition_variable m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
bool m_needStop; //停止的标志
};
线程池
#include <iostream>
#include <list>
#include <thread>
#include <functional>
#include <memory>
#include <atomic>
#include "pthread_queue.cpp"
using namespace std;
const int MaxTaskCount = 100;
class ThreadPool
{
public:
//规定任务类型为void,funciton可为任意函数,即可处理任意任务
using Task = std::function<void()>;
//hardware_concurrency检测硬件性能,给出默认线程数
ThreadPool( int numThreads = std::thread::hardware_concurrency()): m_queue(MaxTaskCount)
{
Start(numThreads);
}
~ThreadPool(void)
{
//如果没有停止则主动停止线程池
Stop();
}
//call_once保证StopThreadGroup只被调用一次
void Stop()
{
std::call_once(m_flag, [this]{StopThreadGroup(); });
}
//普通形式和右值引用的添加任务
//添加很多任务
void AddTask(Task &&task)
{
m_queue.Put(std::forward<Task>(task));
}
//添加一个任务
void AddTaskk(const Task &task)
{
m_queue.Put(task);
}
void Start( int numThreads )
{
m_running = true;
//创建线程组
for( int i=0; i<numThreads; ++i )
{
cout << " Init create thread pool " << endl;
m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
}
private:
void RunInThread()
{
while( m_running )
{
std::list<Task> list;
cout << "take\n";
m_queue.Take(list);
for(auto &task : list)
{
if(!m_running)
return ;
task();
}
}
}
void StopThreadGroup()
{
//让同步队列中的线程停止
m_queue.Stop();
m_running = false;
for(auto thread : m_threadgroup)
{
if(thread)
{
thread->join();
}
}
m_threadgroup.clear();
}
private:
std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
SyncQueue<Task> m_queue; //同步队列
atomic_bool m_running; //是否停止的标志
std::once_flag m_flag;
};
int main( int argc, char *argv[] )
{
ThreadPool pool(2);
std::thread thd1([&pool]{
for( int i=0; i<10; i++ )
{
auto thdId = std::this_thread::get_id();
pool.AddTask([thdId]{
std::cout << thdId << "thread execute task"<< std::endl;
});
}
});
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.Stop();
thd1.join();
return EXIT_SUCCESS;
}
测试结果:
如图,刚开始创建两个异步处理线程360和656,然后创建一个同步层线程不停的往任务对列里加任务,异步层交替的从队列取任务处理。