muduo 的定时器功能由三个 class 实现,TimerId、Timer 和 TimerQueue。
TimerId 类
它唯一标识一个 Timer 定时器。TimerId Class 同时保存Timer* 和 sequence_,这个 sequence_ 是每个 Timer 对象有一个全局递增的序列号 int64_t sequence_,用原子计数器(AtomicInt64)生成。
它主要用于注销定时器,这样就可以区分地址相同的先后两个 Timer 对象。
namespace muduo
{
namespace net
{
class Timer;
///
/// An opaque identifier, for canceling Timer.
///
/* 带有唯一标识的Timer,主要用于取消Timer */
class TimerId : public muduo::copyable
{
public:
TimerId()
: timer_(NULL),
sequence_(0)
{
}
TimerId(Timer* timer, int64_t seq)
: timer_(timer), //timer 定时器的指针
sequence_(seq) //seq 该定时任务的序列号
{
}
// default copy-ctor, dtor and assignment are okay
friend class TimerQueue;
private:
Timer* timer_;
int64_t sequence_;
};
}
}
Timer 类
封装了定时器的一些参数,包括超时时间(expiration_)、超时回调函数(callback_)、时间间隔(interval_)、是否重复定时(repeat_)、定时器的序列号等成员变量,成员函数大都是返回这些变量的值,run() 用来调用回调函数,restart() 用来重启定时器。
Timer.h
namespace muduo
{
namespace net
{
///
/// Internal class for timer event.
///
/* 定时器 */
class Timer : boost::noncopyable
{
public:
Timer(const TimerCallback& cb, Timestamp when, double interval)
: callback_(cb),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }
#ifdef __GXX_EXPERIMENTAL_CXX0X__
Timer(TimerCallback&& cb, Timestamp when, double interval)
: callback_(std::move(cb)),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }
#endif
void run() const
{
callback_(); //执行定时器回调函数
}
/* 返回定时器的超时时间戳 */
Timestamp expiration() const { return expiration_; }
/* 是否周期性定时 */
bool repeat() const { return repeat_; }
/* 返回本定时器的序列号 */
int64_t sequence() const { return sequence_; }
/* 重启定时器 */
void restart(Timestamp now);
static int64_t numCreated() { return s_numCreated_.get(); }
private:
const TimerCallback callback_; //超时回调函数
Timestamp expiration_; //超时时间戳
const double interval_; //时间间隔,如果是一次性定时器,该值为0
const bool repeat_; //是否重复执行
const int64_t sequence_; //本定时任务的序号
static AtomicInt64 s_numCreated_; //定时器计数,当前已经创建的定时器数量
};
}
}
Timer.cc
#include <muduo/net/Timer.h>
using namespace muduo;
using namespace muduo::net;
AtomicInt64 Timer::s_numCreated_;
void Timer::restart(Timestamp now)
{
if (repeat_)
{
//如果需要重复,那就将时间设为下次超时的时间
expiration_ = addTime(now, interval_);
}
else
{
//如果不需要重复,那就将超时时间设为一个不可用的 value
expiration_ = Timestamp::invalid();
}
}
TimerQueue 类
定时器队列,用于管理所有的定时器,此类的接口只有两个:添加和注销定时器,分别为 addTimer()
和 cancel()
。
TimerQueue 数据结构的选择。需要高效地组织目前尚未到期的 Timer,能快速地根据当前时间找到已经到期的 Timer,也要能高效地添加和删除 Timer。最终选择了 set < pair<TimeStamp,Timer*> >
,采用 pair 为 key 的原因是可能在一个时刻有多个相同的 Timestamp 时间戳超时,而查找只返回一个,这样即使两个 Timer 的超时时间相同,它们的地址也必须不同。
通过给 timerfd 一个超时时间实现超时计时,它内部有 Channel,通过 Channel 管理 timerfd,然后向EventLoop和 Poller 注册 timerfd 的可读事件,当 timerfd 的可读事件就绪时表明一个超时时间点到了,然后调用 timerfdChannel_ 的可读事件回调 handleRead()
,通过 getExpired()
找出所有的超时事件,然后执行相应的超时回调函数 Timer::run()
。为了复用定时器,每次处理完之后,会检查这些超时定时器是否需要重复定时,如果需要重复,就再次添加到定时器集合中。
timerfd 如何实现多个定时器超时计时的呢?每次向保存定时器的 set 容器插入一个定时器 Timer 的时候就比较 set 的头元素的超时时间,若新插入的超时时间小,则更新 timerfd 的时间,从而保证 timerfd 始终是 set 中最近的一个超时时间。当 timerfd 可读时,需要遍历容器 set,因为可能此时有多个 Timer 超时了(尽管 tiemrfd 是当前最小的超时时间)。这里的关键是采用 timerfd 实现统一事件源。
TimerQueue.h
namespace muduo
{
namespace net
{
class EventLoop;
class Timer;
class TimerId;
///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
/* 定时器队列 */
class TimerQueue : boost::noncopyable
{
public:
explicit TimerQueue(EventLoop* loop);
~TimerQueue();
///
/// Schedules the callback to be run at given time,
/// repeats if @c interval > 0.0.
///
/// Must be thread safe. Usually be called from other threads.
/* 添加一个定时器 */
TimerId addTimer(const TimerCallback& cb,
Timestamp when,
double interval);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId addTimer(TimerCallback&& cb,
Timestamp when,
double interval);
#endif
/* 注销一个定时器 */
void cancel(TimerId timerId);
private:
// FIXME: use unique_ptr<Timer> instead of raw pointers.
// This requires heterogeneous comparison lookup (N3465) from C++14
// so that we can find an T* in a set<unique_ptr<T>>.
typedef std::pair<Timestamp, Timer*> Entry; //对应一个定时任务
typedef std::set<Entry> TimerList; //定时任务集合,采用set,有key无value,且有序
typedef std::pair<Timer*, int64_t> ActiveTimer; //下面有解释
typedef std::set<ActiveTimer> ActiveTimerSet;
void addTimerInLoop(Timer* timer); //添加一个定时任务
void cancelInLoop(TimerId timerId); //注销一个定时器
// called when timerfd alarms
void handleRead(); //timerfd 可读 的回调
// move out all expired timers
std::vector<Entry> getExpired(Timestamp now); //获取所有超时的定时器
/* 重置超时的定时器 */
void reset(const std::vector<Entry>& expired, Timestamp now);
bool insert(Timer* timer); //把定时器插到TimerList中
EventLoop* loop_; //TimerQueue 所属的 EventLoop
const int timerfd_; // 内部的 timerfd
Channel timerfdChannel_; //timerfd 对应的Channel,借此来观察timerfd_ 上的readable事件
// Timer list sorted by expiration
TimerList timers_; //所有的定时任务
// for cancel()
// timers_ 与 activeTimers_ 都保存了相同的Timer 地址
// timers_ 是按超时时间排序,activeTimers_ 是按定时器地址排序
ActiveTimerSet activeTimers_;
bool callingExpiredTimers_; /* atomic *///是否处于 处理定时器超时回调中
ActiveTimerSet cancelingTimers_; //保存被注销的定时器
};
}
}
TimerQueue.cc
TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
boost::bind(&TimerQueue::handleRead, this));//设置timerfd可读事件回调函数为handleRead
// we are always reading the timerfd, we disarm it with timerfd_settime.
timerfdChannel_.enableReading(); //timerfd 注册可读事件
}
TimerQueue::~TimerQueue()
{
timerfdChannel_.disableAll();
timerfdChannel_.remove();
::close(timerfd_);
// do not remove channel, since we're in EventLoop::dtor();
for (TimerList::iterator it = timers_.begin();
it != timers_.end(); ++it)
{
delete it->second; //手动释放Timer*
}
}
/* 添加定时任务,返回此定时器对应的唯一标识 */
TimerId TimerQueue::addTimer(const TimerCallback& cb,
Timestamp when,
double interval)
{
/* new 一个定时器对象 interval 大于0 ,就是需要重复的定时器 */
Timer* timer = new Timer(cb, when, interval);
/*
* runInLoop 的意思是 如果本IO线程想要添加定时器则直接由 addTimerInLoop 添加
* 如果是其他线程向IO线程添加定时器则需要间接通过 queueInLoop添加
*/
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId TimerQueue::addTimer(TimerCallback&& cb,
Timestamp when,
double interval)
{
// 右值语义
Timer* timer = new Timer(std::move(cb), when, interval);
loop_->runInLoop(
boost::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
#endif
/* 注销一个定时器,被EventLoop::cancel(TimerId timerId)调用 */
void TimerQueue::cancel(TimerId timerId)
{
loop_->runInLoop(
boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}
/* IO线程向自己添加定时器 */
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();
bool earliestChanged = insert(timer); //如果当前插入的定时器 比队列中的定时器都早 则返回真
if (earliestChanged) //最早的超时时间改变了,就需要重置timerfd_的超时时间
{
resetTimerfd(timerfd_, timer->expiration()); //timerfd_ 重新设置超时时间
}
}
void TimerQueue::cancelInLoop(TimerId timerId)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
ActiveTimer timer(timerId.timer_, timerId.sequence_);
ActiveTimerSet::iterator it = activeTimers_.find(timer); //查找该定时器
if (it != activeTimers_.end()) // 找到了
{
/* 从 timers_ 和 activeTimers_ 中删掉*/
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
assert(n == 1); (void)n;
delete it->first; // FIXME: no delete please //手动 delete
activeTimers_.erase(it);
}
else if (callingExpiredTimers_) //可能正在处理
{
/* 那就先 插入要被注销的定时器 */
cancelingTimers_.insert(timer);
}
assert(timers_.size() == activeTimers_.size());
}
/* timerfd 可读事件的回调函数 */
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now);
/* 找出所有超时的事件 */
std::vector<Entry> expired = getExpired(now);
callingExpiredTimers_ = true;
cancelingTimers_.clear();
// safe to callback outside critical section
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
it->second->run(); //执行超时定时器的回调
}
callingExpiredTimers_ = false;
reset(expired, now); //重置定时器,如果不需要再次定时,就删掉,否则再次定时
}
/* 获取队列中超时的定时器 */
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired; //保存超时定时器的容器
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); //哨兵值
TimerList::iterator end = timers_.lower_bound(sentry); //返回第一个未超时的Timer的迭代器
assert(end == timers_.end() || now < end->first); //均未超时或者找到了
std::copy(timers_.begin(), end, back_inserter(expired)); //把超时的定时器拷贝到 expired 容器中
timers_.erase(timers_.begin(), end); //将超时的定时器从timers_删掉
for (std::vector<Entry>::iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
size_t n = activeTimers_.erase(timer); // 将超时的定时器 从 activeTimers_ 删掉
assert(n == 1); (void)n;
}
assert(timers_.size() == activeTimers_.size()); // 都删掉之后 size 应该相同
return expired; //返回超时的那部分定时器
}
/* 已经执行完超时回调的定时任务后,检查这些定时器是否需要重复 */
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;
for (std::vector<Entry>::const_iterator it = expired.begin();
it != expired.end(); ++it)
{
ActiveTimer timer(it->second, it->second->sequence());
if (it->second->repeat() // 需要重复 而且 没有要被注销
&& cancelingTimers_.find(timer) == cancelingTimers_.end())
{
/* 将该定时器的超时时间改为下次超时的时间 */
it->second->restart(now);
insert(it->second); //重新插入到定时器容器中
}
else
{
// FIXME move to a free list
// 不需要重复就删除
delete it->second; // FIXME: no delete please
}
}
if (!timers_.empty())
{
/* 获取当前定时器集合中的最早定时器的时间戳,作为下次超时时间*/
nextExpire = timers_.begin()->second->expiration();
}
if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire); //重置 timerfd_ 的超时时间
}
}
/* 向 set 中插入新的定时器 */
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
bool earliestChanged = false; // 最早的超时时间 是否被更改
Timestamp when = timer->expiration(); //新插入 timer 的超时时间
TimerList::iterator it = timers_.begin(); // 当前最早的定时任务
if (it == timers_.end() || when < it->first)
{
earliestChanged = true;
//如果timers_为空,或者when 小于目前最早的定时任务,那么最早的超时时间,肯定需要被改变
}
{
/* 向 timers_ 中插入定时任务 */
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
assert(result.second); (void)result;
}
{
/* 向 activeTimers_ 中插入定时任务 */
std::pair<ActiveTimerSet::iterator, bool> result
= activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
assert(result.second); (void)result;
}
/* 插入完成后,两个容器元素数目应该相同 */
assert(timers_.size() == activeTimers_.size());
return earliestChanged; //返回修改标志,表示最近的超时时间已经改变
}
定时器的使用接口
EventLoop 中提供了四个接口来使用定时器,三个是添加定时器,都转而调用 TimerQueue::addTimer();还有一个注销定时器。如下:
/* 在时间戳为 time 的时刻执行,0.0 表示不重复 */
TimerId EventLoop::runAt(const Timestamp& time, const TimerCallback& cb)
{
return timerQueue_->addTimer(cb, time, 0.0);
}
/* 延迟 delay 时间执行 */
TimerId EventLoop::runAfter(double delay, const TimerCallback& cb)
{
Timestamp time(addTime(Timestamp::now(), delay));
return runAt(time, cb);
}
/* 重复定时器,时间间隔为 interval */
TimerId EventLoop::runEvery(double interval, const TimerCallback& cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(cb, time, interval);
}
/* 注销定时器,直接调用 TimerQueue::cancel() */
void EventLoop::cancel(TimerId timerId)
{
return timerQueue_->cancel(timerId);
}
timerfd 的相关操作
timerfd 是 Linux 为用户程序提供的一个定时器接口,将定时器抽象为文件描述符,通过文件描述符的可读事件进行超时通知,该文件在超时的那一刻变得可读,这样就能完美的融入到 select/poll 框架中,用统一的方式处理 I/O 和定时事件。同时它的时间精度比用 select/poll 的 timeout 更高,timeout 定时精度只有毫秒。
提供了三个 timerfd C API:
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value,struct itimerspec *old_value);
int timerfd_gettime(int fd, struct itimerspec *curr_value);
下面看一下 TimerQueue 中 timerfd 的相关操作。
/* 创建 timerfd */
int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
if (timerfd < 0)
{
LOG_SYSFATAL << "Failed in timerfd_create";
}
return timerfd;
}
/* 计算超时时间与当前时间的时间差,并将参数转换为 api 接受的类型 */
struct timespec howMuchTimeFromNow(Timestamp when)
{
/* 微秒数 = 超时时刻微秒数 - 当前时刻微秒数 */
int64_t microseconds = when.microSecondsSinceEpoch()
- Timestamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts; //转换成 struct timespec 结构返回
ts.tv_sec = static_cast<time_t>(
microseconds / Timestamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
return ts;
}
/* 读timerfd,避免定时器事件一直触发 */
void readTimerfd(int timerfd, Timestamp now)
{
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
if (n != sizeof howmany)
{
LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
}
}
/* 重置定时器超时时间 */
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
bzero(&newValue, sizeof newValue);
bzero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue); //到这个时间后,会产生一个定时事件
if (ret)
{
LOG_SYSERR << "timerfd_settime()";
}
}
以上就是 muduo 定时器的实现,我自己在写 http server 时自己用最小堆实现的定时器,有兴趣的可以看下:
https://github.com/Tanswer/Xserver/blob/master/src/timer.h