前言
上一篇博客中,我们分析了对于一个线程池中任务队列实现,本文则是对线程池源码分析和一个实例的使用。
正文
const int MaxTaskCount = 100; //任务队列中保存最大任务数,使用const int 而不是 宏 请参考《Effective C++ 》
class ThreadPool{
public:
//这里使用了函数对象作为任务队列中的任务
using Task = std::function<void()>;
//构造函数 默认开启与CPU核数相同的线程数
ThreadPool(int numThreads = std::thread::hardware_concurrency()): M_queue(MaxTaskCount) { Start(numThreads); }
//析构函数直接调用Stop方法
~ThreadPool(void) { Stop();}
void Stop()
{
//call_once保证StopThreadGroup方法只会被多个线程调用一次,而StopThreadGroup才是真正结束线程池的方法
std::call_once(m_flag, [this]{StopThreadGroup();});
}
private:
void Start(int numThreads)
{
m_running = true;
for(int i = 0;i < numThreads; ++i)
//这里构造线程时要执行成员函数RunInThread,所以要传递this指针
m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
void StopThreadGroup()
{
M_queue.Stop(); //唤醒在任务队列上阻塞的线程
m_running = false;//设置运行标志,使正在执行任务的任务的线程结束
//依次退出线程
for(auto thread : m_threadgroup)
if(thread)
thread->join();//等待线程结束
//清空线程组队列
m_threadgroup.clear();
}
私有成员变量
private:
//线程组
std::list<std::shared_ptr<std::thread>> m_threadgroup;
SyncQueue<Task> M_queue;//任务队列
std::atomic_bool m_running;//线程池运行标志使用atomic_bool保证原子性
std::once_flag m_flag; //once_call的参数
};
添加任务的public方法
//这里添加任务的方法和任务队列中的Put方法思想相同,重载函数保证参数三种情况(常量左值,左值,右值)
void AddTask(Task&& task)
{
M_queue.Put(std::forward<Task>(task));
}
void AddTask(const Task& task)
{
M_queue.Put(task);
}
线程执行工作函数
private:
void RunInThread()
{
while(m_running){
std::list<Task> List;
M_queue.Take(List);//这里只使用了Take的一种版本
for(auto& task : List){
if(!m_running)
return;
task();
}
}
}
demo实现
一个使用线程池的例子
void testThePool()
{
ThreadPool pool(2);
std::cout << "start" << std::endl;
//开启两个线程用于向任务队列中添加任务
//线程1
std::thread thd1([&pool]{
for(int i = 0; i < 10; ++i){
auto thdId =std::this_thread::get_id(); //获取线程ID
//使用lambda表达式作为函数对象添加
pool.AddTask([thdId]{std::cout << "同步层线程1的线程ID: " << thdId << std::endl;});
}
});
//线程2同上
std::thread thd2([&pool]{
for(int i = 0; i < 10; ++i){
auto thdId =std::this_thread::get_id();
pool.AddTask([thdId]{std::cout << "同步层线程2的线程ID:" << thdId << std::endl;});
}
});
//std::chrono用于计时,这里主线程等待2s 防止子线程任务还没有添加完主线程就直接退出
std::this_thread::sleep_for(std::chrono::seconds(2));
getchar(); //读取一个输入作为终止
pool.Stop(); //终止线程池中的工作线程
thd1.join(); //阻塞当前主线程直到线程1结束
thd2.join(); //同上等待线程2
}
主函数
int main(void)
{
testThePool();
}