文章目录
前言
因为 Redis 的网络模块是一个采用 epoll 的但线程模型, 阅读起来相对更加简单, 就先从这一部分入手
文章主要包括 :
- TCP socket accept 建立连接的过程
- Redis 的 epoll 模型
- 处理时间事件和文件事件的流程
- TCP 数据的读写
- 处理非活动连接
必要数据结构
封装 epoll 的必要成员
typedef struct aeApiState
{
// epoll_event 实例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
文件事件
typedef struct aeFileEvent
{
// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// 多路复用库的私有数据
void *clientData;
} aeFileEvent;
时间事件结构
typedef struct aeTimeEvent
{
// 时间事件的唯一标识符
long long id; /* time event identifier. */
// 事件的到达时间
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件处理函数
aeTimeProc *timeProc;
// 事件释放函数
aeEventFinalizerProc *finalizerProc;
// 多路复用库的私有数据
void *clientData;
// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;
} aeTimeEvent;
已就绪事件
typedef struct aeFiredEvent
{
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;
事件处理器的状态 (就是最主要的 aeEventLoop
typedef struct aeEventLoop
{
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据
void *apidata; //我们只分析指向 epoll
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
初始化事件处理器状态
ae.c 中
/*
* 初始化事件处理器状态
*/
aeEventLoop *aeCreateEventLoop(int setsize)
{
aeEventLoop *eventLoop;
int i;
// 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL)
goto err;
// 初始化文件事件结构和已就绪文件事件结构数组
eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL)
goto err;
// 设置数组大小
eventLoop->setsize = setsize;
// 初始化执行最近一次执行时间
eventLoop->lastTime = time(NULL);
// 初始化时间事件结构
eventLoop->timeEventHead = NULL; //定时事件链表置空
eventLoop->timeEventNextId = 0; //定时事件id为0
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
//给EPOLL申请空间
if (aeApiCreate(eventLoop) == -1)
goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化监听事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
// 返回事件循环
return eventLoop;
err:
if (eventLoop)
{
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
先调用 aeCreateEventLoop 函数, 动态分配内存, 进行初始化
初始化事件处理器其中包含有:
- 文件事件数组 (内部是一个指针, 初始化时动态分配固定内存大小, 形如数组)
- 已就绪文件事件数组 (同上)
- 时间事件链表 (指针置空) Redis 的时间事件是一个单链表, 且无序, 所以查找复杂度为O(n), 每次新的时间事件放入表头
Redis 只使用一个 serverCron 一个时间事件, 在其中执行所有的周期性任务, 所以 redis 几乎将这个无序链表当成指针在用, 没有造成性能上的影响
- void* 指针指向封装的 epoll 结构体, 其中有 epfd 和 epoll_event 数组(也是一个 epoll_event 指针)
返回 eventloop 指针
创建 listenfd 并加入 epoll
初始化完后, 需要调用anetTcpServer
函数(anet.c 中)
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
这个函数中调用_anetTcpServer
去真正调用::socket
函数创建一个listenfd
, 经过 bind listen 后, 调用aeCreateFileEvent
(ae.c) 将其添加进 epoll 的监听事件合集
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize)
{
errno = ERANGE;
return AE_ERR;
}
if (fd >= eventLoop->setsize)
return AE_ERR;
// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定 fd 的指定事件, 本质上调用 epoll_ctl
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE)
fe->rfileProc = proc;
if (mask & AE_WRITABLE)
fe->wfileProc = proc;
// 私有数据
fe->clientData = clientData;
// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
执行主循环
void aeMain(aeEventLoop *eventLoop)
{
eventLoop->stop = 0;
while (!eventLoop->stop)
{
// 如果有需要在事件处理前执行的函数,那么运行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 开始处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
处理事件 aeProcessEvents
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* flag 没有指定监听事件*/
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS))
return 0;
/*这里会调用epoll阻塞直到时间事件到期*/
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)))
{
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest)
{
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms); //获取当前时间
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) //最近的时间事件还没有到期
{
tvp->tv_usec = ((shortest->when_ms + 1000) - now_ms) * 1000;
tvp->tv_sec--;
}
else
{
tvp->tv_usec = (shortest->when_ms - now_ms) * 1000;
}
// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0)
tvp->tv_sec = 0;
if (tvp->tv_usec < 0)
tvp->tv_usec = 0;
}
else
{
// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
//如果flag设置的这个,需要尽快返回,阻塞时间为0
if (flags & AE_DONT_WAIT)
{
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
else
{
//不是的话我们可以阻塞
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; //为NULL时,epoll_wait的timeout设置为-1,一直等待
}
}
// 处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
//遍历就绪事件数组
for (j = 0; j < numevents; j++)
{
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 读事件
if (fe->mask & mask & AE_READABLE)
{
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
//执行读事件回调
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE)
{
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
}
processed++;
}
}
/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop); //处理所有已到达时间事件(遍历时间链表)
return processed; /* return the number of processed file/time events */
}
该函数flag
默认设置为AE_ALL_EVENTS
, 即处理所有类型事件
如果没有设置不能阻塞立刻返回的AE_DONT_WAIT
标志且没有一个最近要到期的时间事件, epoll
本来会一直阻塞下去,
但是 Redis 默认是有一个周期性检查自身资源和状态的时间事件(上面也提到了, 即serverCron
), 所以会设置epoll
的超时时间为时间事件的到期时间间隔, 到期后执行时间事件
当然, 根据代码能看出来, 如果等待期间有文件事件发生, 会先解决文件事件, 然后再处理时间事件, 所以时间事件的实际执行时间总是稍晚一点
数据读写
Redis 的客户端与服务器交互数据时, 都按照 Redis 定义的协议对格式进行编码, 这样就使消息之间有了 “边界”, 来应对 TCP 协议的流特性
比如说 : 客户端发送SET msg “helloworld”
那么客户端实际发送的数据是:
*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$11\r\nhelloworld\r\n
*3
即该命令有3个参数, $3
第一个参数长度为3, 值为SET
, 也就是要执行的命令, 同上第二个参数长度为3, 值为msg
, 第三个参数长度为11, 值为hello world
处理非活动连接
// 检查客户端是否已经超时,如果超时就关闭客户端,并返回 1 ;
// 否则返回 0 。
int clientsCronHandleTimeout(redisClient *c) {
// 获取当前时间
time_t now = server.unixtime;
// 服务器设置了 maxidletime 时间
if (server.maxidletime &&
// 不检查作为从服务器的客户端
!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
// 不检查作为主服务器的客户端
!(c->flags & REDIS_MASTER) && /* no timeout for masters */
// 不检查被阻塞的客户端
!(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
// 不检查订阅了频道的客户端
dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
// 不检查订阅了模式的客户端
listLength(c->pubsub_patterns) == 0 &&
// 客户端最后一次与服务器通讯的时间已经超过了 maxidletime 时间
(now - c->lastinteraction > server.maxidletime))
{
redisLog(REDIS_VERBOSE,"Closing idle client");
// 关闭超时客户端
freeClient(c);
return 1;
} else if (c->flags & REDIS_BLOCKED) {
/* Blocked OPS timeout is handled with milliseconds resolution.
* However note that the actual resolution is limited by
* server.hz. */
// 获取最新的系统时间
mstime_t now_ms = mstime();
// 检查被 BLPOP 等命令阻塞的客户端的阻塞时间是否已经到达
// 如果是的话,取消客户端的阻塞
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
// 向客户端返回空回复
replyToBlockedClientTimedOut(c);
// 取消客户端的阻塞状态
unblockClient(c);
}
}
// 客户度没有被关闭
return 0;
}
我们发现 Redis 简单粗暴的比较客户端上一次的访问时间, 如果唱过阈值, 就直接断开连接