阅读本文大约需要 19 分钟
Hello 大家好,我是虎珀!
今天跟大家分享「一条 Redis 命令是如何执行的?」,想要用好 Redis,了解它的内部原理必不可少。
只有熟悉 Redis 的内部执行原理,在开发时,我们就能考虑到 Redis 每一个执行步骤,做到胸有成竹。
注:本文源码基于 Redis 6.2
01 Redis 模块与架构
首先,我们来「拆解」Redis。当我们熟悉 Redis 的模块时,定位问题才能直击本质。
先问大家一个问题,你对「 Redis 」的架构认识是怎样的?
这里我总结了 Redis 的核心模块,如下图所示:
从宏观来看,Redis 可以分为单个结点、主从副本、Sentinel(哨兵)、Cluster(集群)。
从微观来看,Redis 内部划分为事件驱动层、命令层、内存分配/回收、RDB/AOF 持久化、监控与统计。
- Redis 客户端:官方提供了 C 语言开发的客户端。除了发送命令外,还支持性能分析、性能测试等。你可以通过 redis-cli -h 来查看。
- 事件驱动层:Redis 基于 IO 多路复用,封装了短小精悍的高性能网络框架 ae。内部集成了 fileEvent(新建连接、读、写事件),timeEvent(时间事件)。
- 命令层:负责执行各种命令。如 GET、SET、LPUSH 等等。
- 内存分配/回收:Redis 基于 jemalloc 提供了快速、低碎片率的内存分配模块。
- RDB 与 AOF:Redis 提供的持久化策略,以保证数据可靠性。
- Replaction 副本:Redis 通过副本,实现「主-从」运行模式,是故障切换的基石,用来提高系统运行可靠性。也支持读写分离,提高性能。
- Sentinel 哨兵:哨兵用来支持故障时,主从结点自动切换。哨兵为 Redis 高可用提供了保障。
- Cluster 集群:Redis 基于数据分片,以支持横向扩展的一种高性能模式。
- 监控与统计:Redis 提供了丰富的监控信息和性能分析工具,包含内存使用、big key 统计、热点 key 统计、基准测试等等。
02 Redis 等于单线程?
你可能在网络上听过一种说法,「Redis 是单线程的」,又或者「Redis 执行命令是单线程的」。
那么,Redis 等于单线程吗?我画了一张图供你参考。
上图可分为 3 个模块
- 主线程和 IO 线程:负责命令读取、解析、结果返回。命令执行由主线程完成。
- bio 线程:负责执行耗时的异步任务。
- 后台进程:fork 子进程来执行耗时的命令。
在 Redis 6 以前,从命令接收到执行,主要由主线程来完成。Redis 6 引入了 IO 多线程。IO 线程功能是接收命令、解析命令、发送结果。
除此之外,Redis 还有后台线程,用来处理耗时的任务,称为 bio 线程家族。bio 线程功能目前有 3 点:
- close fd:关闭文件描述符。
- AOF fsync:fsync 刷盘。
- Lazy Free:异步释放对象内存。
IO 线程和 bio 线程是在 Redis server 启动时初始化的。你可以在源码中找到。
void InitServerLast() {
// 初始化 bio 线程
bioInit();
// 初始化 IO 线程
initThreadedIO();
...
}
IO 线程和 bio 线程通过「生产者-消费者」模型来执行任务。如下图所示
主线程会将「就绪读」、「就绪写」客户端列表,分发到 IO 线程队列 io_threads_list 中。IO 线程通过 IOThreadMain 函数消费。
主线程会将 bio 任务提交到 bio_jobs 任务队列中,由 bio 后台线程通过 bioProcessBackgroundJobs 函数消费。
除这些线程外,Redis 为了避免阻塞主线程,在执行 bgsave、bgrewriteaof 命令时,会通过 fork 子进程来执行。你可以参考以下函数:
// RDB 后台进程任务
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
// AOF 后台进程任务
int rewriteAppendOnlyFileBackground(void);
03 Redis 事件驱动模型
在分析命令执行前,我们先来看看 Redis 最核心的模块——事件驱动。
「事件驱动模型」很常见,可以认为是高性能网络组件的标准。通常,事件驱动分为注册事件、事件触发、处理事件 3 个步骤。
Redis 注册的事件可分为两类:
- fileEvent:网络事件,包括新建连接、读、写事件。
- timeEvent:时间事件,特定时间执行的任务。
其中,新建连接事件在 Redis 启动时注册。当 Redis 收到新建连接请求后,会调用 「acceptTcpHandler」。
void initServer(void) {
// 注册新建连接回调函数 acceptTcpHandler
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
}
读事件处理函数「readQueryFromClient」,在新建连接时注册 。
写事件处理函数「sendReplyToClient」,在发送执行结果时注册。
// 读事件处理函数。新建连接时注册
connSetReadHandler(conn, readQueryFromClient);
// 写事件处理函数。单次事件循环,无法发完数据时注册
connSetWriteHandler(c->conn, sendReplyToClient)
在 Redis server 启动后,就进入了事件循环「aeMain」。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 事件循环处理函数
// 关注读、写、时间事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
单次事件循环 aeProcessEvents 函数简化后,执行流程如下。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
// 事件触发前执行函数 beforeSleep
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
// 获取触发事件
numevents = aeApiPoll(eventLoop, tvp);
// 事件触发后执行函数 afterSleep
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 循环处理事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// 执行读事件回调函数 rfileProc
if (fe->mask & mask & AE_READABLE)
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
// 执行写事件回调函数 wfileProc
if (fe->mask & mask & AE_WRITABLE)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
}
// 时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}
其中「beforeSleep」函数。在每次事件触发前,会执行一些特定功能。
04 一条 Redis 命令是如何执行的?
了解完事件模块,我们来看「一条 Redis 命令是如何执行的」。下图我梳理了执行流程。
首先,客户端发起请求,由 Redis 事件驱动模块 ae 接收。ae 是一个基于 IO 多路复用的 while 无限循环(Linux 下基于 epoll)。
ae 模块在接收连接请求后,会触发「新建连接事件」,由 「acceptTcpHandler」 函数执行。该函数负责接收连接、新建连接,以及初始化 client 数据结构。
你可以参考如下函数调用流程。
上图最后一步「createClient」,会在初始化 client 数据结构的同时,设置读事件回调函数 「readQueryFromClient」。该回调函数是 Redis 执行命令的核心入口。
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
...
if (conn) {
// 设置读事件回调函数 readQueryFromClient
// 接收命令时触发
connSetReadHandler(conn, readQueryFromClient);
}
...
}
「acceptTcpHandler」函数执行完毕后,一条 Redis 客户端连接就创建成功了!
接着我们发起命令
127.0.0.1:6379> SET foo bar
OK
Redis 收到命令后,触发 ae 模块「读事件」,进入「readQueryFromClient」执行流程。该流程判断是否启用 IO 多线程,选择以下两条分支之一。
- 若启用,则主线程将该连接客户端加入「clients_pending_read」读就绪队列,并将客户端 flag 标记为「CLIENT_PENDING_READ」,表示可读。下一次循环时,会将 clients_pending_read 队列分发给 IO 线程和主线程,执行读取请求、解析命令等操作。最终,由主线程执行命令。
- 若未启用,则主线程「独自」执行读取命令、解析命令、执行命令、发送结果等全部流程。
其中,解析命令流程,会解析客户端发来的请求字符串。具体为以下两个步骤。
- 找到命令对应的执行函数,放到 client->cmd->proc 中。
- 解析参数,放到 client->argv、client->argc 中。
Redis 所有命令的执行函数,保存在 「redisCommandTable」 中。SET 命令对应为「setCommand」。
struct redisCommand redisCommandTable[] = {
...
{
"set",setCommand,-3,
"write use-memory @string",
0,NULL,1,1,1,0,0,0},
...
}
接下来,我们重点分析开启 IO 多线程场景。
前面我们提到,每次事件循环,Redis 会执行预处理函数「beforeSleep」,该函数内会将 clients_pending_read 读就绪队列进行分发。具体调用函数如下
int handleClientsWithPendingReadsUsingThreads(void) {
// 未开启 IO 线程,直接返回
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
...
// 否则,分发「读」就绪队列到线程私有队列 io_threads_list[target_id] 中
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
...
// 主线程执行 io_threads_list[0] 任务
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 主线程等待其它 IO 线程执行任务
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
while(listLength(server.clients_pending_read)) {
...
// 主线程,执行命令(已读取完成,解析好的命令)。
if (processPendingCommandsAndResetClient(c) == C_ERR) {
continue;
}
...
}
return processed;
}
该函数遍历 clients_pending_read 「读」就绪队列,将「读」任务分发给 IO 线程和主线程的任务队列「io_threads_list」。收到任务后,IO线程和主线程进入「readQueryFromClient」执行流程。注意,本次执行 readQueryFromClient 前,client 状态已被设置为 「CLIENT_PENDING_READ」 ,所以执行时,client 不会再次加入任务队列,而是进入真正的执行流程。
为了方便你理解,我画了一张图供你参考。
上图中「readQueryFromClient」最后两步,通过 connRead 从 socket 读取数据,存放到 client querybuf 中。接着,解析并找到可执行的命令 setCommand。最后,将该 client 标记为 CLIENT_PENDING_COMMAND,表示为可执行状态。
接下来,最重要的一点,由 「主线程独自执行命令」 。执行函数如下
int processPendingCommandsAndResetClient(client *c)
我画了该函数执行流程图供你参考
其中,c->cmd->proc 用来执行真正的命令 setCommand。
执行完命令后,主线程进入最后一步「addReply」,调用 prepareClientToWrite,将执行结果,加入 「clients_pending_write」 写就绪队列中,等待返回客户端。
void addReply(client *c, robj *obj) {
// 加入 clients_pending_write 写就绪队列
if (prepareClientToWrite(c) != C_OK) return;
...
}
在进入下一次事件循环时,beforeSleep 函数,将 clients_pending_write 写就绪队列,分发给 IO 线程和主线程。执行函数如下:
int handleClientsWithPendingWritesUsingThreads(void) {
// 如果开启 IO 线程或者客户端连接很少
// 主线程直接同步发送结果
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
...
// 否则,分发 clients_pending_write 给 IO 线程和主线程执行
while((ln = listNext(&li))) {
int target_id = item_id % server.io_threads_num;
// 添加到线程任务队列
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
...
// 主线程处理分配给自己的任务,这里是同步执行
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 直接发送给客户端
writeToClient(c,0);
}
// 等待 IO 线程执行完毕
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
// 如果同步写数据,没有写完,则注册写事件
// 在下一次事件循环中触发
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 注册写事件
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
}
最终,IO 线程和主线程,通过 「writeToClient」 函数,将命令执行结果发送给客户端。
以上就是「一条 Redis 命令的执行流程」。
05 总结
从上文,我们可以得出结论:
「Redis 执行命令是单线程的,在主线程中执行」
「Redis 6 的 IO 多线程,可以帮助主线程读取数据、解析命令、发送结果」
了解到这一点后,相信你也明白,为什么有些 Redis 规范会说,不要使用时间复杂度高的命令。因为会阻塞主线程,影响到其它命令的执行。
并且 Redis 6 提供的 IO 多线程,可以有效的提高 Redis 单结点性能。如果你使用的 Redis 版本较低,建议升级到 Redis 6 并开启 IO 多线程。
-End-
最后,欢迎大家关注我的公众号「虎珀」。
我会继续写出更好的技术文章。
如果我的文章对你有所帮助,还请帮忙点赞一下啦~