在看这篇文章之前,需要先对 linux 的 epoll 模型有所了解。请参考:
Redis内置了一个高性能事件循环器,叫做AE。其定义和实现可以在ae*.h/cpp这些文件中找到。
事件驱动模型
Redis 的事件驱动模型处理 2 类事件:
- 文件事件,如连接建立、接受请求命令、发送响应等;
- 时间事件,如 Redis 中定期要执行的统计、key 淘汰、缓冲数据写出、rehash等。
一、文件事件处理
Redis 的文件事件采用典型的 Reactor 模式进行处理。Redis 文件事件处理机制分为 4 部分:
- 连接 socket
- IO 多路复用程序
- 文件事件分派器
- 事件处理器
虽然多个文件事件可能会并发出现,但 IO 多路复用程序总会将所有产生事件的 socket 放入一个队列中,通过这个队列,有序的把这些文件事件通知给文件分派器。
1. IO多路复用
Redis 封装了 4 种多路复用程序,每种封装实现都提供了相同的 API 实现。编译时,会按照性能和系统平台,选择最佳的 IO 多路复用函数作为底层实现,选择顺序是,首先尝试选择 Solaries 中的 evport,如果没有,就尝试选择 Linux 中的 epoll,否则就选择大多 UNIX 系统都支持的 kqueue,这 3 个多路复用函数都直接使用系统内核内部的结构,可以服务数十万的文件描述符。
如果当前编译环境没有上述函数,就会选择 select 作为底层实现方案。Redis 的这 4 种实现,分别在 ae_evport、ae_epoll、ae_kqueue 和 ae_select 这 4 个代码文件中。
2. 文件事件收集及派发器
Redis 中的文件事件分派器是 aeProcessEvents 函数。它会首先计算最大可以等待的时间,然后利用 aeApiPoll 等待文件事件的发生。如果在等待时间内,一旦 IO 多路复用程序产生了事件通知,则会立即轮询所有已产生的文件事件,并将文件事件放入 aeEventLoop 中的 aeFiredEvents 结构数组中。每个 fired event 会记录 socket 及 Redis 读写事件类型。
这里会涉及将多路复用中的事件类型,转换为 Redis 的 ae 事件驱动模型中的事件类型。以采用 Linux 中的 epoll 为例,会将 epoll 中的 EPOLLIN 转为 AE_READABLE 类型,将 epoll 中的 EPOLLOUT、EPOLLERR 和 EPOLLHUP 转为 AE_WRITABLE 事件。
aeProcessEvents 在获取到触发的事件后,会根据事件类型,将文件事件 dispatch 派发给对应事件处理函数。如果同一个 socket,同时有读事件和写事件,Redis 派发器会首先派发处理读事件,然后再派发处理写事件。
3. 文件事件处理函数分类
Redis 中文件事件函数的注册和处理主要分为 3 种。
● 连接处理函数 acceptTcpHandler
Redis 在启动时,在 initServer 中对监听的 socket 注册读事件,事件处理器为 acceptTcpHandler,该函数在有新连接进入时,会被派发器派发读任务。在处理该读任务时,会 accept 新连接,获取调用方的 IP 及端口,并对新连接创建一个 client 结构。如果同时有大量连接同时进入,Redis 一次最多处理 1000 个连接请求。
● readQueryFromClient 请求处理函数
连接函数在创建 client 时,会对新连接 socket 注册一个读事件,该读事件的事件处理器就是 readQueryFromClient。在连接 socket 有请求命令到达时,IO 多路复用程序会获取并触发文件事件,然后这个读事件被派发器派发给本请求的处理函数。readQueryFromClient 会从连接 socket 读取数据,存入 client 的 query 缓冲,然后进行解析命令,按照 Redis 当前支持的 2 种请求格式,及 inline 内联格式和 multibulk 字符块数组格式进行尝试解析。解析完毕后,client 会根据请求命令从命令表中获取到对应的 redisCommand,如果对应 cmd 存在。则开始校验请求的参数,以及当前 server 的内存、磁盘及其他状态,完成校验后,然后真正开始执行 redisCommand 的处理函数,进行具体命令的执行,最后将执行结果作为响应写入 client 的写缓冲中。
● 命令回复处理器 sendReplyToClient
当 redis需要发送响应给client时,Redis 事件循环中会对client的连接socket注册写事件,这个写事件的处理函数就是sendReplyToClient。通过注册写事件,将 client 的socket与 AE_WRITABLE 进行间接关联。当 Client fd 可进行写操作时,就会触发写事件,该函数就会将写缓冲中的数据发送给调用方。
二、 时间事件处理
Redis 中的时间事件是指需要在特定时间执行的事件。多个 Redis 中的时间事件构成 aeEventLoop 中的一个链表,供 Redis 在 ae 事件循环中轮询执行。
Redis 当前的主要时间事件处理函数有 2 个:
● serverCron
● moduleTimerHandler
Redis 中的时间事件分为 2 类:
● 单次时间,即执行完毕后,该时间事件就结束了。
● 周期性事件,在事件执行完毕后,会继续设置下一次执行的事件,从而在时间到达后继续执行,并不断重复。
时间事件主要有 5 个属性组成。
● 事件 ID:Redis 为时间事件创建全局唯一 ID,该 ID 按从小到大的顺序进行递增。
● 执行时间 when_sec 和 when_ms:精确到毫秒,记录该事件的到达可执行时间。
● 时间事件处理器 timeProc:在时间事件到达时,Redis 会调用相应的 timeProc 处理事件。
● 关联数据 clientData:在调用 timeProc 时,需要使用该关联数据作为参数。
● 链表指针 prev 和 next:它用来将时间事件维护为双向链表,便于插入及查找所要执行的时间事件。
时间事件的处理是在事件循环中的 aeProcessEvents 中进行。执行过程是:
- 首先遍历所有的时间事件。
- 比较事件的时间和当前时间,找出可执行的时间事件。
- 然后执行时间事件的 timeProc 函数。
- 执行完毕后,对于周期性时间,设置时间新的执行时间;对于单次性时间,设置事件的 ID为 -1,后续在事件循环中,下一次执行 aeProcessEvents 的时候从链表中删除。
源码解析
1. 创建 eventLoop
我们从 Redis 的启动开始看起(server.c)(本节我们重点关注 ae 事件模型,其他代码先忽略掉)
void initServer(void) {
......
// 处理 signal,这里的 signal 有两大类:
// 1. kill 命令:通过 serverCron 优雅的关闭程序,而不是强制退出。
// 2. 严重错误:主要是记录现场,关闭程序。
setupSignalHandlers();
......
// 创建事件循环
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
......
// 监听端口
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
......
// 创建 时间事件,处理后台任务 serverCron,比如客户端超时、淘汰过期数据等
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
// 创建 文件事件处理函数,用于监听连接
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
serverPanic("Unrecoverable error creating TLS socket accept handler.");
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
......
}
我们先看下 eventloop 的结构:
typedef struct aeEventLoop {
/* 注册进 eventloop 的最大的 fd */
int maxfd;
/* 最多可以持有多少个文件描述符 */
int setsize;
long long timeEventNextId;
/* 注册进 eventloop 的文件事件,数组,大小等于 setsize
aeFileEvent *events;
/* 当前已触发的事件 */
aeFiredEvent *fired;
// 链表,处理事件的时候,从表头开始遍历
aeTimeEvent *timeEventHead;
// eventloop 的运行状态
int stop;
/* This is used for polling API specific data */
// 根据多路复用的 api 不同,具体结构有所不同,对于 linux 的 epoll,存放着 epfd 和 epoll_event
void *apidata;
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
这里我们看下 aeCreateEventLoop(ae.c)方法:
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
monotonicInit(); /* just in case the calling app didn't initialize */
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
// 时间事件链表头
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
// 停止标识符,表示 eventLoop 的运行状态,0为运行,1为停止
eventLoop->stop = 0;
eventLoop->maxfd = -1;
// 进入poll前的回调函数
eventLoop->beforesleep = NULL;
// 进入poll后的回调函数
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
// 根据选择的ae底层原理不同,aeApiCreate会有不同的实现;
// 在Linux中使用epoll进行实现(ae_epoll.c),主要工作是 将 aeEventLoop->apiData
// 指向 epfd 和 epoll_event
if (aeApiCreate(eventLoop) == -1) goto err;
......
return eventLoop;
}
aeApiCreate 针对不同的操作系统有不同的实现。比如 Unix 中是 ae_kqueue.c,这里我们看下 Linux 中的 ae_epoll.c,其中就调用了 epoll_create 方法
static int aeApiCreate(aeEventLoop *eventLoop) {
......
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
......
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
......
return 0;
}
ae_epoll.c 中还有一些方法,都是对底层 epoll 的封装。事实上,不管那种 IO 复用方式,只要实现下面这些接口,就可以正常的对接 Redis 了
int aeApiCreate(aeEventLoop *eventLoop);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
void aeApiResize(aeEventLoop *eventLoop, int setsize);
void aeApiFree(aeEventLoop *eventLoop);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
char *aeApiName(void);
创建完 eventLoop 之后,紧接着创建了监听器,用于监听 TCP 连接;createSocketAcceptHandler 这个方法内部实际上是:
- 调用了 epoll 的 epoll_ctl 方法,添加了一个 fd
- 定义了一个 acceptTcpHandler;用于处理连接请求:底层 epoll 将事件传递过来之后,用来连接用户 client
2. 启动 eventLoop
回到主线,在执行完 initServer 之后,会调用 ae.c 中的 aeMain 方法,开启事件循环:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
这里 aeProcessEvents 的第二个参数
AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP
并不是要处理的事件本身,而是表示要处理哪些类型的事件。如果当前空闲,暂时没有事情要处理,aeProcessEvents 方法会一直阻塞在 epoll_wait 上,所以这里的 while (!eventLoop->stop) 并不会导致 cpu 跑飞。
3. ae 事件模型的核心模块 - 处理事件
aeProcessEvents 的核心处理逻辑条件判断比较复杂,我感觉写的不是很好,可能它很管用,但是理解起来比较复杂,分支也相当乱,想了解具体步骤的话,需要去认真阅读源码。我在这里简要概括下基本逻辑:
- 首先,扫一遍定时器链表(usUntilEarliestTimer),找到剩余时间最少的一个定时器,假设剩余时间是 x
- 然后,调用 epoll_wait 阻塞住,epoll_wait 的超时时间设置为上面的 x 。
- 如果上一步的链表中没有 时间事件,那么 x=-1,epoll_wait 会永久阻塞直到下一个 文件事件 到来
- 如果遍历链表的时候,发现有已经过期的 时间事件,则 x=0,epoll_wait 立刻返回,直接去处理 时间事件
- 在 x 时间内,如果没有文件事件,那么 epoll_wait 超时,去处理 时间事件(processTimeEvents)(如果 时间事件 是需要立刻执行的(AE_DONT_WAIT)那么超时时间设为 0,即立刻返回)
- 在 x 时间内,如果有文件事件,那么 epoll_wait 返回,去处理 文件事件。处理完 文件事件 之后,再去搜索一遍 时间事件 链表。
/*
* 一般情况下,先处理所有到期的 时间事件(time event),再处理所有文件事件(file event)(因为时间事件可能会触发新的文件事件)
* 如果 flags 中没有特殊的标记位的话,这个方法会 sleep,直到 文件事件 触发,或者下一个 时间事件 到来。
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
*
* 方法返回处理完成的事件的数量
*/
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
......
/* 只处理 时间事件 或 文件事件 */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* 这里就是 event loop 的核心逻辑 */
/* Note that we want to call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
......
// 遍历时间事件链表,如果有 item,将超时时间放到 tvp 里
struct timeval tv, *tvp;
......
/* Before sleep callback. */
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* 调用 multiplexing API,这个方法里,调用了 epoll_wait,所以会阻塞住,
* epoll_wait 超时时间是 tvp,如果 tvp 是空,则一直阻塞,直到下一个事件到来
* will return only on timeout or when some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 遍历事件列表,依次处理事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* 一般情况下,我们可以很快地处理完读请求,所以这里优先处理 读事件。
* 某些情况下,需要在 beforeSleep() hook 中处理事情的时候(比如需要把文件写回磁盘),会优先处理 写事件 */
int invert = fe->mask & AE_BARRIER;
/* 处理读事件 */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* 处理写事件 */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* 如果“读事件-写事件”被反转了,这时候再处理读事件 */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* 处理时间事件(time events);具体的处理器,在创建这个事件的时候就作为参数指定好了,即 long long aeCreateTimeEvent( ... aeTimeProc *proc ...) , redis 中比较典型的一个时间事件处理器,就是 evictionTimeProc */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* 返回处理完成的事件的数量 */
}
接下来我们看下 processTimeEvents 方法,它的主要工作是遍历链表,删除不再需要执行的定时器,执行已超时的定时器:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
monotime now = getMonotonicUs();
while(te) {
/* 删除掉已经执行过的,且不再需要执行的定时器 */
if (te->id == AE_DELETED_EVENT_ID) {
......
// 调用 createTimeEvent 时候注册的 finalizer 方法,释放资源
if (te->finalizerProc) {
te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs();
}
zfree(te);
te = next;
continue;
}
/* 这段注释有点意思,大家可以仔细读一读,体会下作者的良苦用心
* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
// 真正的处理逻辑,调用创建 timeEvent 时候注册的 handler
if (te->when <= now) {
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
now = getMonotonicUs();
// 定时器内容处理完后,会返回一个 int 值,表示下次执行需要等待的时间(秒)
// 如果是 AE_NOMORE=-1,则表示不需要在执行了,下个 eventloop 会从链表删除掉该节点
if (retval != AE_NOMORE) {
te->when = now + retval * 1000;
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
其中一段注释非常有意思,"we keep it here for future defense" 这应该是一个优秀的工程师应该具备的基本素养之一。
至此,Redis 的 ae 事件模型已经基本学习完毕了。其他部分的代码,“线程模型&网络框架” 不再做讨论。