Redis 命令执行过程分析,彻底解析 Redis 底层原理(图解+秒懂+史上最全)
本文 的 原文 地址原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址
本文作者:
[*]第一作者老架构师 肖恩(肖恩是尼恩团队 高级架构师,负责写此文的第一稿,初稿 )
[*]第二作者 老架构师 尼恩 (45岁老架构师, 负责提升此文的 技术高度,让大家有一种俯视 技术、俯瞰技术、 技术自由的感觉)
Redis 命令执行过程分析
一条Redis命令的执行流程,本质是"连接建立→处理阶段→返回结果"3个环节 串联,每个环节都由事件机制驱动。
处理阶段 包含2个 子阶段:读取命令→解析执行
理解这个过程,能帮助 大家深入地掌握Redis的性能优化(比如避免内存溢出)、故障排查(比如 慢查询排查、耗时命令定位排查)等实践技能。
Redis命令执行的三大阶段
一条命令从执行到返回数据,主要涉及三个阶段,具体如下:
[*]第一阶段:建立连接阶段:
完成 socket 连接建立,并创建 client 对象;
[*]第二阶段:处理阶段:
从 socket 读取数据到输入缓冲区,解析获得命令后执行,再将返回值存入输出缓冲区;
[*]第三阶段:数据返回阶段:
将输出缓冲区的返回值写入 socket 并返回给客户端,最后关闭 client。
这三个阶段通过事件机制串联。
Redis 启动时会先注册 socket 连接建立事件处理器:
[*]客户端请求建立 socket 连接时,触发对应处理器,完成连接建立后注册 socket 读取事件处理器;
[*]客户端发送命令时,读取事件处理器被触发,执行处理阶段逻辑后注册 socket 写事件处理器;
[*]写事件处理器被触发时,将返回值写回 socket。
Redis命令执行的三个阶段,通过 反应器模式事件机制驱动和串联,
接下来的内容 非常重要, 就是: 反应器模式事件机制
45岁老架构师尼恩提示: 这个是面试的核心重点。
反应器模式(Reactor Pattern)事件机制
反应器模式是一种事件驱动(Event-Driven)的设计模式。
核心目标是高效处理多并发 I/O 操作。
其通过一个 "反应器"(Reactor)组件统一管理事件的注册、监听和分发,将 I/O 事件(如连接建立、数据可读 / 可写)分发给对应的处理器(Handler)处理,避免传统多线程模型中线程上下文切换的开销,提升系统吞吐量。
Reactor Pattern 核心组件
[*]事件(Event):触发处理的信号,如 "连接建立"、"数据可读"、"定时任务触发" 等。
[*]反应器(Reactor):事件的 "总调度中心",负责监听事件(如 I/O 事件、定时事件),并将事件分发给对应的处理器。
[*]事件多路分发器(Event Multiplexer):底层依赖操作系统的 I/O 多路复用机制(如 select、poll、epoll、kqueue),同时监听多个 I/O 句柄(如 socket),当事件触发时通知反应器。
[*]事件处理器(Handler):负责具体的事件处理逻辑(如读取数据、解析请求、发送响应),与反应器解耦。
Redis 单线程 反应器实现
Redis通过单线程高效处理大量客户端的网络请求和定时任务。
Redis 的反应器模式以事件循环(Event Loop) 为核心,单线程循环处理两类事件:文件事件(网络 I/O 事件) 和时间事件(定时任务)。
(1)Redis事件循环流程(非常简单)
伪代码如下:
while (1) {
[*]时间事件距离计算: 计算当前距离下一个时间事件的阻塞时间(避免无意义等待);
[*]IO事件阻塞等待: 调用I/O多路复用器,阻塞等待IO事件(超时时间为步骤1的结果);
[*]处理IO事件:处理所有已就绪的IO事件,按事件类型分发给对应 handler
[*]处理时间事件:处理所有已到期的时间事件;
}
更加细致的Redis事件循环流程 循环:
调用 epoll_wait 等待事件 → 返回就绪事件 → 按事件类型分发给对应 handler。
Redis 三大事件类型,以及对应的处理器如下:
[*]ACCEPT(新链接事件) → acceptTcpHandler → 建立连接并注册 READ 事件;
[*]READ (io读就绪事件)→ readQueryFromClient → 读取-解析-执行命令 → 准备响应 → 注册 WRITE 事件;
[*]WRITE (io写就绪事件)→ sendReplyToClient → 发送完取消写事件。
(2)关键组件实现
[*]事件多路分发器:
Redis 会根据操作系统自动选择最优的 I/O 多路复用器(如 Linux 用 epoll,macOS 用 kqueue),封装为统一的aeApi接口,负责监听 socket 的读 / 写事件。
[*]IO事件处理器:按照IO事件类型,调用对应的处理器,完成事件处理。
[*]时间事件处理器:处理定时任务(如过期键清理、AOF 日志刷盘),通过链表存储时间事件,每次事件循环检查并执行已到期的任务。
Redis 的IO事件 (/文件事件)按事件类型分为三类处理器 (handler):
[*]连接应答处理器:acceptTcpHandler, 处理客户端的新连接请求(对应 socket 的 "可读" 事件,触发accept);
[*]命令请求处理器:readQueryFromClient , 处理客户端发送的命令(读取数据、解析命令);
[*]命令回复处理器:sendReplyToClient , 将命令执行结果返回给客户端(处理 socket 的 "可写" 事件)。
(3)特点
[*]单线程模型:网络I/O + 命令执行 + 响应返回,整个事件循环由一个线程执行,避免锁竞争,内存操作原子性天然保障,简化设计;
[*]事件优先级:文件事件优先于时间事件(时间事件仅在文件事件处理完毕后执行);
[*]高效性:通过 I/O 多路复用器批量处理就绪事件,单线程即可支撑数万并发连接。
关于单线程版本的反应器实现 ,请参见尼恩的《Java高并发核心编程卷1》 ,此书对单线程版本的反应器有通俗易懂的系统化介绍。
Netty 多线程 反应器实现
Netty 是基于 Java NIO 的高性能网络框架,其反应器模式采用多线程架构,通过 "主从 Reactor" 模型实现连接处理与 I/O 读写的分离,支撑更高的并发场景。
Netty 的反应器模式以 EventLoopGroup(事件循环组) 为核心,通过分工明确的多线程处理网络事件。
(1)主从 Reactor 模型
[*]Boss Group(主 Reactor): 负责监听客户端的连接请求(处理OP_ACCEPT事件),建立连接后将 socketChannel 注册到 Worker Group。
[*]Worker Group(从 Reactor): 负责处理已建立连接的 I/O 事件(OP_READ/OP_WRITE),并通过 ChannelPipeline 中的处理器(Handler)处理业务逻辑。
(2)关键组件实现
[*]EventLoopGroup(事件循环组) : 包含一个或者多个 EventLoop
[*]EventLoop:每个 EventLoop 对应一个线程,循环执行事件处理(类似 Redis 的事件循环),包含一个 Selector(Java NIO 的 I/O 多路复用器),负责监听注册到其上的 Channel 的 I/O 事件。
[*]ChannelPipeline:事件处理的 "责任链",包含多个 ChannelHandler,负责解析数据(如编解码)、处理业务逻辑(如请求响应),支持动态添加 / 移除处理器,灵活性极高。
[*]事件分发:当 Selector 检测到 I/O 事件后,EventLoop 会将事件封装为 ChannelEvent,按顺序在 ChannelPipeline 中传递,由对应的 Handler 处理。
(3)Netty事件循环流程(比redis 复杂得多)
Boss 循环监听 OP_ACCEPT事件→ accept → 注册 Channel 到 Worker → Worker 循环监听 OP_READ/OP_WRITE →Pipeline 顺序触发 Handler → 读写完成。
这里比redis 复杂得多, 具体请参见尼恩 讲的 Netty源码视频。
(3)特点
[*]多线程模型:通过 Boss/Worker 分组,分离连接建立与 I/O 处理,充分利用多核 CPU;
[*]异步非阻塞:所有 I/O 操作均为异步,通过 Future/Promise 机制处理结果,避免线程阻塞;
[*]高度可扩展:ChannelPipeline 支持灵活的处理器组合,适配不同协议(如 HTTP、TCP、WebSocket)。
关于多线程版本的反应器实现 ,请参见尼恩的《Java高并发核心编程卷1》 ,此书对多线程版本的反应器有通俗易懂的系统化介绍。
区别:Redis是一个小号的 Netty
反应器模式的核心是 "事件驱动 + I/O 多路复用",但 Redis 和 Netty 因场景不同选择了差异化实现:
[*]Redis 以单线程简化设计,聚焦高效处理内存型请求;
[*]Netty 以多线程主从模型提升并发能力,支撑复杂网络通信。
两者均通过反应器模式最大化了 I/O 处理效率,是各自领域高性能的关键设计。
维度Redis(单线程 Reactor)Netty(主从多线程 Reactor)线程数1 主线程完成 accept+read+write+cmdBoss(1)+Worker(N)+可选业务线程池Reactor 数量单 Reactor主 Reactor + 多 Sub-Reactor并发瓶颈单核 CPU,长命令阻塞Worker 可水平扩展,长任务可下沉业务线程池编程模型纯 C 函数回调,无 Handler 链Pipeline + ChannelHandler 责任链锁机制无锁Worker 之间无锁,Handler 共享数据需同步适用场景极高 QPS 且命令极快(缓存)通用网络框架,长/短连接、大文件、RPC 等Redis 用极简单线程 Reactor换取极致低延迟;Netty 用主从多线程 Reactor换取高并发与可扩展性。
redis,是一个小号的 netty。
接下来,我们逐一分析Redis反应器的 各个步骤的具体原理和代码实现。
Redis启动时监听socket
Redis 服务器启动时会调用 initServer 方法,主要完成三件事:
[*]建立事件机制 eventLoop、
[*]注册周期时间事件处理器(如serverCron),用于定期执行后台操作:清理过期键、统计信息更新等
[*]注册监听 socket 的IO事件处理器(监听连接建立事件,处理函数为 acceptTcpHandler)。
void initServer(void) { // server.c
....
/**
* 创建Redis的事件循环器eventLoop,用于管理所有事件(连接、读写等)
*/
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
/* 打开用于接收用户命令的TCP监听socket */
if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
/**
* 注册周期时间事件处理器(如serverCron)
* 用于定期执行后台操作:清理过期键、统计信息更新等
*/
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/**
* 为所有监听的socket注册文件事件处理器
* 监听"可读事件"(客户端发起连接请求时触发)
* 绑定处理函数acceptTcpHandler,负责建立连接
*/
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd, AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
....
}Redis 的命令执行全程依赖 initServer 中创建的 aeEventLoop 事件循环器。
当 socket 发生对应事件(如连接请求、数据到达)时,aeEventLoop 会自动进行事件分发,调用预先注册的处理器。
第一阶段:建立连接阶段
当客户端向 Redis 建立 socket时,aeEventLoop 会把事件分发到acceptTcpHandler函数 处理。
acceptTcpHandler为每个链接创建一个 Client 对象,并创建相应IO事件来监听socket的可读事件,并指定事件处理函数。
acceptTcpHandler 函数会首先调用 anetTcpAccept方法,它底层会调用 socket 的 accept 方法,也就是接受客户端来的建立连接请求,然后调用 acceptCommonHandler方法, 创建 client 对象,并注册 socket 读事件处理器。
连接处理核心代码
// 客户端建立连接时的事件处理函数(networking.c)
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
....
// 接受客户端连接(底层调用socket的accept)
// anetTcpAccept最终在anet.c的anetGenericAccept中调用accept
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
/**
* 处理连接建立后的逻辑:创建client、检查连接数等
*/
acceptCommonHandler(cfd,0,cip);
}
// 连接建立后的通用处理(networking.c)
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 创建client对象(代表一个客户端连接)
c = createClient(fd);
// 检查是否超过最大客户端连接数(配置文件中的maxclients)
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
// 写入失败不处理,后续会关闭连接
}
server.stat_rejected_conn++;
freeClient(c); // 释放client并关闭连接
return;
}
.... // 处理无密码时的默认保护状态
// 更新连接数统计
server.stat_numconnections++;
c->flags |= flags;
}client对象创建
createClient 方法会初始化 client 的属性(如输入/输出缓冲区),配置 socket 为非阻塞模式,设置 NODELAY 和 SOKEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制,并注册读事件处理器(当客户端发送数据时触发)。
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
// fd为-1时用于特殊场景(如执行lua脚本)
if (fd != -1) {
// 配置socket:非阻塞模式(避免IO阻塞)、关闭Nagle算法(减少延迟)、开启保活机制
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
/**
* 向eventLoop注册读事件处理器
* 当客户端通过socket发送数据时,调用readQueryFromClient读取数据
*/
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// 初始化客户端默认选中的数据库(默认第0个)
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
c->fd = fd;
.... // 初始化其他属性(输入缓冲区querybuf、输出缓冲区等)
return c;
}client 对象包含输入缓冲区(存储客户端发送的数据)和输出缓冲区(存储Redis的响应数据),按类型分为
[*]普通客户端、
[*]从客户端(主从复制用)、
[*]订阅客户端(发布订阅用),
不同类型的缓冲区大小配置不同。
第二阶段:命令处理阶段
读取socket数据到输入缓冲区
客户端发送命令后,socket 触发读事件TriggerRead ,这个事件的CallReadreadQueryFromClient 会将数据读入 client 的输入缓冲区 querybuf,并检查缓冲区大小是否超过限制。
[*]若为普通客户端,直接处理缓冲区数据;
[*]若为主从复制中的主客户端,还需同步命令到从节点。
// 读取客户端发送的数据到输入缓冲区(networking.c)
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
....
// 为输入缓冲区预留空间
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 从socket读取数据到querybuf(输入缓冲区)
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
.... // 读取错误,释放client
} else if (nread == 0) {
// 客户端主动关闭连接
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* 若client是主从复制中的主节点,将数据存入pending_querybuf用于同步到从节点 */
c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
}
// 更新输入缓冲区的长度
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
// 检查输入缓冲区是否超过配置的最大限制(client-query-buffer-limit)
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c); // 关闭客户端连接
return;
}
if (!(c->flags & CLIENT_MASTER)) {
// 处理普通客户端的输入缓冲区
processInputBuffer(c);
} else {
// 处理主从复制中的主节点客户端
size_t prev_offset = c->reploff;
processInputBuffer(c);
// 若同步偏移量变化,通知从节点更新
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}解析获取命令
processInputBuffer 会解析输入缓冲区的数据,根据命令格式调用不同方法解析,最终得到命令参数 argv 和参数个数 argc,再调用 processCommand 执行命令。
执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性
命令格式 主要有:
[*]单行命令 PROTO_REQ_INLINE
[*]批量命令 PROTO_REQ_MULTIBULK
void processInputBuffer(client *c) { // networking.c
server.current_client = c;
/* 循环处理输入缓冲区中的所有数据 */
while(sdslen(c->querybuf)) {
.... // 处理client的状态(如是否阻塞、是否在事务中)
/* 判断命令格式类型(telnet和redis-cli发送的命令格式不同) */
if (!c->reqtype) {
if (c->querybuf == '*') {
c->reqtype = PROTO_REQ_MULTIBULK; // 批量命令格式(如*2\r\n$3\r\nSET\r\n$5\r\nhello\r\n)
} else {
c->reqtype = PROTO_REQ_INLINE; // 单行命令格式(如SET hello world)
}
}
/**
* 解析输入缓冲区数据,得到命令参数argv和参数个数argc
*/
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break; // 解析失败则退出循环
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* 参数个数为0时,重置client以接收下一条命令 */
if (c->argc == 0) {
resetClient(c);
} else {
// 执行解析得到的命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
// 若为从节点的主客户端,更新同步偏移量
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
// 非阻塞状态下,重置client以接收新命令
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
}
}
server.current_client = NULL;
}执行命令 processCommand方法
processCommand 是命令执行的核心逻辑,主要分为三步:
[*]首先是调用 lookupCommand 方法获得对应的 redisCommand;
[*]接着是检测当前 Redis 是否可以执行该命令;
[*]最后是调用 call 方法真正执行命令。
(1) 如果命令名称为 quit,则直接返回,并且设置客户端标志位。
(2) 根据 argv 查找对应的 redisCommand,所有的命令都存储在命令字典 redisCommandTable 中,根据命令名称可以获取对应的命令。
(3) 进行用户权限校验。
(4) 如果是集群模式,处理集群重定向。当命令发送者是 master 或者 命令没有任何 key 的参数时可以不重定向。
(5) 预防 maxmemory 情况,先尝试回收一下,如果不行,则返回异常。
(6) 当此服务器是 master 时:aof 持久化失败时,或上一次 bgsave 执行错误,且配置 bgsave 参数和 stopwritesonbgsaveerr;禁止执行写命令。
(7) 当此服务器时master时:如果配置了 replminslavestowrite,当slave数目小于时,禁止执行写命令。
(8) 当时只读slave时,除了 master 的不接受其他写命令。
(9) 当客户端正在订阅频道时,只会执行部分命令。
(10) 服务器为slave,但是没有连接 master 时,只会执行带有 CMD_STALE 标志的命令,如 info 等
(11) 正在加载数据库时,只会执行带有 CMD_LOADING 标志的命令,其余都会被拒绝。
(12) 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其余都会拒绝
(13) 如果是事务命令,则开启事务,命令进入等待队列;否则调用call直接执行命令。
int processCommand(client *c) {
// 1. 处理QUIT命令(返回OK并标记关闭)
if (!strcasecmp(c->argv->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/**
* 2. 根据命令名称查找对应的redisCommand结构体
* 所有命令存储在redisCommandTable字典中(如{"get":getCommand, "set":setCommand})
*/
c->cmd = c->lastcmd = lookupCommand(c->argv->ptr);
if (!c->cmd) {
// 处理未知命令(返回错误)
addReplyErrorFormat(c,"unknown command `%s`", (char*)c->argv->ptr);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) {
// 检查参数个数是否匹配(arity为命令定义的参数要求)
addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);
return C_OK;
}
// 3. 检查用户认证(若配置requirepass且未认证,仅允许AUTH命令)
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return C_OK;
}
/**
* 4. 集群模式下处理命令重定向
* 若命令需在其他节点执行,返回重定向信息(如-MOVED)
*/
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
// 5. 处理maxmemory限制(尝试回收内存,失败则返回错误)
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if (retval == C_ERR) {
addReply(c, shared.oomerr);
return C_OK;
}
}
/**
* 6. 主节点特殊检查:
* - AOF持久化失败或bgsave错误时,禁止写命令
* - 从节点数量不足时(repl-min-slaves-to-write),禁止写命令
*/
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand)) {
addReplySds(c, sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
server.aof_last_write_errstr ? server.aof_last_write_errstr : "unknown error"));
return C_OK;
}
/**
* 7. 从节点特殊检查:
* - 只读从节点拒绝非主节点的写命令
* - 未连接主节点时,仅允许带CMD_STALE标志的命令(如INFO)
*/
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE) {
addReply(c,shared.readonlyerr);
return C_OK;
}
/**
* 8. 其他检查:
* - 订阅状态的客户端仅允许特定命令(PING、SUBSCRIBE等)
* - 加载数据库时仅允许带CMD_LOADING标志的命令
* - Lua脚本阻塞时仅允许特定命令(AUTH、SHUTDOWN等)
*/
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
/**
* 9. 执行命令:
* - 事务中(CLIENT_MULTI)的命令入队等待EXEC
* - 非事务命令直接调用call执行
*/
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 事务命令入队
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 非事务命令直接执行
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
// 所有Redis命令的定义(部分示例)
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
.... // 其他命令
};call方法
call 方法负责实际执行命令,包括通知监视器、调用命令处理函数、记录慢查询日志、命令传播(同步到AOF和从节点)等。
[*]如果有监视器 monitor,则需要将命令发送给监视器。
[*]调用 redisCommand 的proc 方法,执行对应具体的命令逻辑。
[*]如果开启了 CMDCALLSLOWLOG,则需要记录慢查询日志
[*]如果开启了 CMDCALLSTATS,则需要记录一些统计信息
[*]如果开启了 CMDCALLPROPAGATE,则当 dirty大于0时,需要调用 propagate 方法来进行命令传播。
命令传播就是将命令写入 repl-backlog-buffer 缓冲中,并发送给各个从服务器中。
call方法关键源码:
// 执行命令的具体实现
void call(client *c, int flags) {
/**
* dirty:记录数据库修改次数
* start/duration:记录命令执行的开始时间和耗时(微秒)
*/
long long dirty, start, duration;
int client_old_flags = c->flags;
/**
* 若有监视器(monitor),将命令发送给监视器
* 排除从AOF加载的命令和特定管理员命令
*/
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
....
/* 执行命令 */
dirty = server.dirty;
start = ustime();
// 调用命令的处理函数(如setCommand、getCommand)
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
.... // Lua脚本的特殊处理
/**
* 记录慢查询日志(若命令耗时超过slowlog-log-slower-than)
*/
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
/**
* 更新命令统计信息
*/
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}
/**
* 命令传播:若修改了数据库(dirty>0),同步到AOF和从节点
*/
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/**
* dirty大于0时,需要广播命令给slave和aof
*/
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
....
/**
* 广播命令,写如aof,发送命令到slave
* 也就是传说中的传播命令
*/
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
....
}命令传播会将命令写入 repl-backlog-buffer 缓冲区,并同步到所有从节点,保证主从数据一致性;同时写入AOF文件(若开启),实现持久化。
第三阶段:数据返回阶段
命令执行完成后,结果会被存入Client的输出缓冲区(buf或reply链表)。
当输出缓冲区有数据时,Redis会注册"写事件处理器",将结果通过socket写回客户端。
命令结果写入输出缓冲区
命令执行完成后都会通过 addReply 方法将结果写入输出缓冲区,等待返回给客户端。addReply 的核心逻辑是:准备写入环境,然后将结果写入输出缓冲区(固定缓冲区或链表)。
void addReply(client *c, robj *obj) {
// 准备客户端写入环境(判断是否需要返回结果、加入等待队列等)
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 若对象是SDS编码,先尝试写入固定缓冲区
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
// 缓冲区满了,写入响应链表
_addReplyObjectToList(c,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
// 整数编码的特殊优化(直接转换为字符串写入)
char buf;
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyObjectToList(c,obj);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}prepareClientToWrite 方法主要负责判断客户端是否需要返回结果,并将客户端加入等待写入队列(clients_pending_write):
int prepareClientToWrite(client *c) {
// Lua脚本或模块的客户端,直接允许写入
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
// 客户端要求不返回结果(如REPLY OFF),直接拒绝
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
// 主从复制中的主节点客户端,不需要返回结果
if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
// AOF加载时的临时客户端,不需要返回结果
if (c->fd <= 0) return C_ERR;
// 若客户端不在等待写入队列,加入队列
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
return C_OK;
}Redis 的输出缓冲区由两部分组成:
[*]固定缓冲区(buf):小数据直接写入,速度快;
[*]响应链表(reply):大数据或缓冲区满时,以节点形式存入链表。
这种设计的好处是:平衡性能和内存效率,小数据用缓冲区减少分配开销,大数据用链表避免缓冲区溢出。
命令返回值从输出缓冲区写入 socket
输出缓冲区中的数据不会立即写入 socket,而是等待 Redis 事件循环处理。具体来说,事件循环每次执行前会调用 beforeSleep 方法,该方法会通过 handleClientsWithPendingWrites 处理等待写入队列中的客户端。
下面的 aeMain 方法就是 Redis 事件循环的主逻辑,可以看到每次循环时都会调用 beforesleep 方法。
void aeMain(aeEventLoop *eventLoop) { // ae.c
eventLoop->stop = 0;
while (!eventLoop->stop) {
/* 如果有需要在事件处理前执行的函数,那么执行它 */
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
/* 开始处理事件*/
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}beforeSleep 函数会调用 handleClientsWithPendingWrites 函数来处理 clientspendingwrite 列表。
// 处理等待写入队列中的客户端
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);// 统计待处理客户端数
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;// 移除等待标记
listDelNode(server.clients_pending_write,ln);// 从队列中移除
// 尝试将输出缓冲区数据写入socket
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 若数据未写完,注册写事件处理器等待下次写入
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
// AOF实时同步时需要设置屏障确保顺序
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
// 注册写事件处理器sendReplyToClient
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
// 实际写入socket的方法
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
sds o;
// 循环写入所有待返回数据
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
// 先写固定缓冲区中的数据
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;// 写入失败或需重试
c->sentlen += nwritten;
totwritten += nwritten;
// 缓冲区数据写完,重置缓冲区
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
// 缓冲区为空,从响应链表取数据
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o);
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
continue;
}
// 写链表中的数据
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
// 链表节点数据写完,移除节点
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objlen;
}
}
// 限制单次写入字节数,避免阻塞事件循环
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
server.stat_net_output_bytes += totwritten;// 统计输出字节数
if (nwritten == -1) {
// 写入错误,关闭客户端
if (errno != EAGAIN) {
serverLog(LL_VERBOSE,"Error writing to client: %s", strerror(errno));
freeClient(c);
}
return C_ERR;
}
// 所有数据写完,清理资源
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
// 若客户端标记了"回复后关闭",则关闭连接
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}
}
return C_OK;
}简单来说,Redis 会先尝试直接将输出缓冲区的数据写入 socket,若一次写不完,则注册写事件处理器,等待操作系统通知 socket 可写时再次写入,直到所有数据发送完成。这种方式既保证了效率,又避免了阻塞事件循环。
set 命令的处理方法: setCommand
前面我们提到,processCommand 方法会从输入缓冲区解析出对应的 redisCommand,然后调用 call 方法执行该命令的 proc 方法。
不同命令的 proc 方法各不相同,比如 set 命令对应的 proc 是 setCommand 方法,get 命令对应的是 getCommand 方法。
这种通过命令找到对应处理函数的方式,和 Java 中的多态思想很相似。
call方法逻辑如下图:
call方法大致源码:
void call(client *c, int flags) {
....
// 调用命令对应的处理函数(如setCommand、getCommand)
c->cmd->proc(c);
....
}
// redisCommand结构体定义
struct redisCommand {
char *name;// 命令名称
redisCommandProc *proc;// 命令处理函数
.... // 其他属性(参数个数、标志位等)
};
// 命令处理函数的类型定义
typedef void redisCommandProc(client *c);
// 所有Redis命令的注册表(部分示例)
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},// get命令对应getCommand
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},// set命令对应setCommand
{"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
.... // 其他命令
};setCommand 会先判断命令是否携带 nx、xx、ex、px 等可选参数,然后调用 setGenericCommand 完成核心逻辑。
我们直接来看 setGenericCommand 方法的处理流程:
setGenericCommand 方法的处理逻辑如下所示:
[*]首先判断 set 的类型是 setnx 还是 setxx,如果是 nx 并且 key 已经存在则直接返回;如果是 xx 并且 key 不存在则直接返回。
[*]调用 setKey 方法将键值添加到对应的 Redis 数据库中。
[*]如果有过期时间,则调用 setExpire 将设置过期时间
[*]进行键空间通知
[*]返回对应的值给客户端。
setGenericCommand 的具体实现代码如下:
// t_string.c
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0;
/**
* 处理过期时间参数:如果存在expire(如ex/px),先解析为毫秒
*/
if (expire) {
// 将expire(robj类型)转换为整数,存储到milliseconds
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
// 过期时间不能小于等于0,否则返回错误
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
// 如果单位是秒(UNIT_SECONDS),转换为毫秒
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
/**
* 处理NX和XX参数:
* - NX(只在key不存在时设置):如果key已存在,返回abort_reply
* - XX(只在key存在时设置):如果key不存在,返回abort_reply
* lookupKeyWrite用于在数据库中查找key(写操作场景)
*/
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
/**
* 将键值对存入数据库的dict哈希表
*/
setKey(c->db,key,val);
server.dirty++;// 记录数据库修改次数(用于持久化和复制)
/**
* 如果有过期时间,将过期时间存入数据库的expires哈希表
*/
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
/**
* 发送键空间通知(如"set"事件,供订阅者感知)
*/
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
/**
* 向客户端返回结果(如"OK"或自定义回复)
*/
addReply(c, ok_reply ? ok_reply : shared.ok);
}简单来说,setKey 就是把键值对存入 redisDb 的 dict 哈希表,setExpire 则把键和过期时间存入 expires 哈希表,两者共同完成 set 命令的核心存储逻辑。redisDb 的 dict 哈希表如下图:
get 命令的处理方法: getCommand
。。。。。。
由于平台篇幅限制, 此处省略了 1000字,剩下的内容,请参见原文地址
原始的内容,请参考 本文 的 原文 地址
本文 的 原文 地址
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]