引子
感觉这东西看过不记一下总会忘,所以手不能懒,及时总结一下。
本文主要针对Redis的服务端模型进行分析,力争能有总体的思路和部分细致的深入。源码版本3.2.8.
正文
Redis服务端一个典型的单线程reactor模型,使用I/O多路复用来完成对文件描述符的监听,然后主线程依次处理就绪的事件。
I/O多路复用
思路非常的简单,首先我们知道I/O多路复用有好几种方式,而这常常是和平台相关的,所以为了实现的简洁,扩展性,跨平台性,Redis在这里进行了一层封装,通过一套统一的API完成整个网络通信部分。
以Linux下的epoll为例,让我们来看一下。
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop);
初始化,主要是分配内存和调用epoll_create生成epoll监听fd,eventLoop结构体是服务端事件驱动的结构体。
static int aeApiResize(aeEventLoop *eventLoop, int setsize);
eventLoop记录了能监控的事件最大数,这里重新调整大小。
static void aeApiFree(aeEventLoop *eventLoop);
释放直接分配给eventLoop的内存空间
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
向eventLoop里面添加要监控的事件,对于epoll来说就是epoll_ctl(EPOLL_CTL_ADD)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
删除监控的事件,对于epoll来说就是epoll_ctl(EPOLL_CTL_DEL)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
I/O多路复用的阻塞调用,对于epoll来说就是epoll_wait,第二个参数表示epoll要㩐待的最长时间。
static char *aeApiName(void);
返回封装的I/O多路复用实现,对于epoll来说就是返回“epoll”
而在选择I/O多路复用的实现时,是按照性能的从高到低。
在Redis的网络库ae.c最前面是这样的
#ifdef HAVE_EVPORT
#include "ae_evport.c" // Solaris
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c" // Linux
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c" // BSD
#else
#include "ae_select.c"
#endif
#endif
#endif
自然,跨平台的select作为保留选择,但是由于其性能原因,放在最后了。
两种事件
Redis的服务端通过事件驱动,I/O事件和定时事件,I/O事件我们都很熟悉,便是可读/写或者accept返回的fd,而定时事件的处理之前则没怎么接触,最近也趁机好好学习了《高性能服务端》里面的 定时器 一章。所以在这里顺便展开小说一下定时器。
定时器
定时器里面有两个最基本的属性,即超时时间和处理函数(回调函数),当然可能还有其他的属性,比如是否需要重启等等,一个定时器就是一个定时事件,我们可以通过链表、时间轮、最小堆来组织定时事件。
而在监控定时事件时可以通过信号(定时发信号检测是否到时),I/O多路复用(超时参数)来实现。
服务端事件驱动流程
在服务端启动之后,首先会进行各种初始化工作,在初始化结束之后,便进入aeMain,开始执行服务端的事件循环,等待客户端连接。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0; // 终止flag
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
可以看到这里便是不断调用aeProcessEvents,并且监听两种种类事件(AE_ALL_EVENTS)
事件种类
#define AE_FILE_EVENTS 1 // I/O事件
#define AE_TIME_EVENTS 2 // 定时事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
每次aeProcessEvents都是一次I/O多路复用的轮询,让我们来看一下这个核心函数的细节
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
eventLoop就是我们的事件驱动主结构体,flag则记录了需要关心的事件(主循环似乎是会关心所有事件,这里我认为主要体现了Redis网网络库的拓展性,程序也许只需要关心一种事件)。
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS))
return 0;
/*
既然只有两种事件,那么这两种事件都不关心自然是直接结束了(return ASAP--as soon as possible)
*/
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { // 需要关心时间事件,并且不是立即返回
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
/*在最近记录的定时事件中,查找还有最短时间的事件*/
if (shortest) { //找到一个距离到时最近的定时事件
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);//获取现在的时间(秒和毫秒)
tvp = &tv;
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
//计算距离到时还有多少时间
if (ms > 0) { // 如果定时事件还没到达时间,将剩余时间记录在tvp中
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {// 已经有定时事件到时了,那么就不等待,将tvp设置为0
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else { //shortest为NULL,目前没有等待的定时事件
if (flags & AE_DONT_WAIT) { /AE_DONT_WAIT被设置则不需要等待
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
//永远等待直到有事件发生
tvp = NULL;
}
}
经过这一步,我们已经计算出了多路复用需要的超时时间保存在tvp里了。然后就是对监听的fd进行轮询了。
numevents = aeApiPoll(eventLoop, tvp);//返回发生的事件
for (j = 0; j < numevents; j++) {//遍历,处理事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) {// 可读事件
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);//调用之前添加的处理函数
}
if (fe->mask & mask & AE_WRITABLE) {// 可写事件
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);// 同上
}
processed++;// 成功处理的事件数加1
}
}
// 可以看到,当一个fd 上既可读又可写时是优先处理可读事件的
因为我们刚才设置的传给aeApiPoll的时间是定时事件刚好到时的剩余时间,所以现在定时事件已经到时了,我们还需要再去处理定时事件。
当然,因为我们是处理完所有非定时事件之后,才处理定时事件的,所以实际处理定时事件可能要比预定的时间慢一点。
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
让我们再来看processTimeEvents的细节。
首先是一个异常处理,如果系统时间发生错乱被移到未来(moved to the future),这里我们就不关心细节了。
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL); // 获取现在的时间
eventLoop->lastTime = now; // 设置时间
prev = NULL; // 前驱指针
te = eventLoop->timeEventHead;// te就是定时事件链表头
maxId = eventLoop->timeEventNextId-1;//定时事件的最大ID
while(te) {// 主循环
long now_sec, now_ms;
long long id;
if (te->id == AE_DELETED_EVENT_ID) { // 这个事件要被删除
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)// 清理资源
te->finalizerProc(eventLoop, te->clientData);
zfree(te);// 释放资源
te = next;
continue;
}
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms)) //如果定时事件到时了(现在的时间大于设定的时间)
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData); //调用处理函数处理
processed++; //处理数+1
if (retval != AE_NOMORE) { //如果这个事件之后还要执行,也就是隔一段时间再执行的事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); //添加时间
} else {
te->id = AE_DELETED_EVENT_ID;//否则设置删除标志
}
}
prev = te;
te = te->next;//处理下一个事件
}
return processed;// 返回处理数
至此,一次对时间事件处理就结束了,aeProcessEvents也就结束了。
而服务端主要就是一个死循环aeProcessEvents来进行事件处理。
参考阅读
《Redis设计与实现》中的 第12章事件,第14章服务器。
小伙伴Tanswer之前也分析了ae.c的源码,并且还有之前学长利用ae写的demo,请移步 Redis网络库源码浅解