线程池
实现:ThreadPool类
功能:实现固定大小的线程池,用来高效的处理大量任务
知识点:
- 回调函数
通俗的讲,回调函数就是在放在另一个函数中执行的函数,在回调中,我们利用某种方式,把回调函数像参数一样传入中间函数。 - boost::ptr_vector
Boost中的指针容器
threads_.reserve 设置指针容器的大小 - for_each
for_each 算法范围 [first, last) 中的每个元素调用函数 F,并返回输入的参数 f。此函数不会修改序列中的任何元素。
用途:
可用于创建线程,创建多线程程序,可用于创建线程池。
模块拆分与封装:
已把其线程池对象模块单独分离出来,并利用Makefile统一编译,模块代码地址如下:https://github.com/chudongfang/Learn_From_Muduo/tree/master/ThreadPoll
代码及分析:
ThreadPoll.h
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#ifndef MUDUO_BASE_THREADPOOL_H
#define MUDUO_BASE_THREADPOOL_H
#include <muduo/base/Condition.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Types.h>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <deque>
namespace muduo
{
class ThreadPool : boost::noncopyable
{
public:
typedef boost::function<void ()> Task;
//构造函数,设置默认线程池名
explicit ThreadPool(const string& nameArg = string("ThreadPool"));
~ThreadPool();
// Must be called before start().
//设置线程池最大线程数量
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
//设置线程初始化回调函数
void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb; }
//开启线程池
void start(int numThreads);
//停止线程池
void stop();
//返回线程池名
const string& name() const
{ return name_; }
//返回队列大小
size_t queueSize() const;
// Could block if maxQueueSize > 0
//向任务队列放入任务
void run(const Task& f);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
void run(Task&& f);
#endif
private:
bool isFull() const;//判断线程池是否满
void runInThread(); //子进程函数,在子进程中执行任务
Task take(); //从任务队列中取任务
mutable MutexLock mutex_;//互斥锁,保证数据互斥访问
Condition notEmpty_; //条件变量,是否为空
Condition notFull_; //条件变量,是否满
string name_; //线程名字
Task threadInitCallback_;//线程初始化化回调函数
boost::ptr_vector<muduo::Thread> threads_;//线程队列,利用指针容器实现
std::deque<Task> queue_; //双向队列
size_t maxQueueSize_; //最大队列容量
bool running_; //线程池是否运行
};
}
#endif
ThreadPoll.cc
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include <muduo/base/ThreadPool.h>
#include <muduo/base/Exception.h>
#include <boost/bind.hpp>
#include <assert.h>
#include <stdio.h>
using namespace muduo;
//构造函数
ThreadPool::ThreadPool(const string& nameArg)
: mutex_(),//初始化锁
notEmpty_(mutex_),//初始化条件变量
notFull_(mutex_),
name_(nameArg), //初始化线程池名
maxQueueSize_(0),
running_(false)
{
}
//析构函数,停止线程池
ThreadPool::~ThreadPool()
{
if (running_)
{
stop();
}
}
void ThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true;
//设置指针容器大小
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
char id[32];
snprintf(id, sizeof id, "%d", i+1);
//新建立进程,并用bind绑定函数进行传参
threads_.push_back(new muduo::Thread(
boost::bind(&ThreadPool::runInThread, this), name_+id));
//运行线程
threads_[i].start();
}
//初始化回调函数
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
void ThreadPool::stop()
{
{
//加互斥锁,当其出当前{}后会自动析构,并解锁
MutexLockGuard lock(mutex_);
running_ = false;
//以广播的形式唤醒等待notEmpty条件变量的线程
notEmpty_.notifyAll();
}
//遍历这个容器,并在每个线程中执行join函数,对线程进程阻塞
for_each(threads_.begin(),
threads_.end(),
boost::bind(&muduo::Thread::join, _1));
}
//返回队列长度
size_t ThreadPool::queueSize() const
{
//MutexLockGuard类对mutex_加锁,当函数结束时,其会自动利用析构函数解锁
MutexLockGuard lock(mutex_);
return queue_.size();
}
//主线程函数
//不断添加task到task队列
void ThreadPool::run(const Task& task)
{
//如果线程池中没有线程,由主线程执行task
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
//如果线程池满,则阻塞等待子线程对task进行处理
while (isFull())
{
notFull_.wait();
}
assert(!isFull());
//向线程队列中添加task
queue_.push_back(task);
//唤醒子线程处理task
notEmpty_.notify();
}
}
#ifdef __GXX_EXPERIMENTAL_CXX0X__
void ThreadPool::run(Task&& task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull())
{
notFull_.wait();
}
assert(!isFull());
queue_.push_back(std::move(task));
notEmpty_.notify();
}
}
#endif
//子线程函数
//从任务队列(queue)中取task
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
//子进程等待任务队列中的task
while (queue_.empty() && running_)
{
notEmpty_.wait();
}
Task task;
//从任务队列中取task
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
//唤醒其他子线程来取task,保证了task被挨个取走
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task;
}
//判断队列是否满
bool ThreadPool::isFull() const
{
mutex_.assertLocked();
return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;
}
//子线程函数
//利用回调函数初始化线程
//从任务队列中取task并执行
//抛出异常
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();
}
while (running_)
{
Task task(take());
if (task)
{
task();
}
}
}
catch (const Exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
abort();
}
catch (const std::exception& ex)
{
fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
fprintf(stderr, "reason: %s\n", ex.what());
abort();
}
catch (...)
{
fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
throw; // rethrow
}
}