开发技术分享

  • 文章分类
    • 技术
    • 工作
    • 相册
    • 杂谈
    • 未分类
  • 工具
    • AI 试衣
靡不有初,鲜克有终
[换一句]
  1. 首页
  2. 技术
  3. 正文

Redis 高性能主线 – 线程模型&网络框架

2021年12月29日 1536点热度

在看这篇文章之前,需要先对 linux 的 epoll 模型有所了解。请参考:

  1. Linux IO 模型
  2. IO 多路复用

Redis内置了一个高性能事件循环器,叫做AE。其定义和实现可以在ae*.h/cpp这些文件中找到。

事件驱动模型

Redis 的事件驱动模型处理 2 类事件:

  • 文件事件,如连接建立、接受请求命令、发送响应等;
  • 时间事件,如 Redis 中定期要执行的统计、key 淘汰、缓冲数据写出、rehash等。

一、文件事件处理

Redis 的文件事件采用典型的 Reactor 模式进行处理。Redis 文件事件处理机制分为 4 部分:

  1. 连接 socket
  2. IO 多路复用程序
  3. 文件事件分派器
  4. 事件处理器

虽然多个文件事件可能会并发出现,但 IO 多路复用程序总会将所有产生事件的 socket 放入一个队列中,通过这个队列,有序的把这些文件事件通知给文件分派器。

1. IO多路复用

Redis 封装了 4 种多路复用程序,每种封装实现都提供了相同的 API 实现。编译时,会按照性能和系统平台,选择最佳的 IO 多路复用函数作为底层实现,选择顺序是,首先尝试选择 Solaries 中的 evport,如果没有,就尝试选择 Linux 中的 epoll,否则就选择大多 UNIX 系统都支持的 kqueue,这 3 个多路复用函数都直接使用系统内核内部的结构,可以服务数十万的文件描述符。

如果当前编译环境没有上述函数,就会选择 select 作为底层实现方案。Redis 的这 4 种实现,分别在 ae_evport、ae_epoll、ae_kqueue 和 ae_select 这 4 个代码文件中。

2. 文件事件收集及派发器

Redis 中的文件事件分派器是 aeProcessEvents 函数。它会首先计算最大可以等待的时间,然后利用 aeApiPoll 等待文件事件的发生。如果在等待时间内,一旦 IO 多路复用程序产生了事件通知,则会立即轮询所有已产生的文件事件,并将文件事件放入 aeEventLoop 中的 aeFiredEvents 结构数组中。每个 fired event 会记录 socket 及 Redis 读写事件类型。

这里会涉及将多路复用中的事件类型,转换为 Redis 的 ae 事件驱动模型中的事件类型。以采用 Linux 中的 epoll 为例,会将 epoll 中的 EPOLLIN 转为 AE_READABLE 类型,将 epoll 中的 EPOLLOUT、EPOLLERR 和 EPOLLHUP 转为 AE_WRITABLE 事件。

aeProcessEvents 在获取到触发的事件后,会根据事件类型,将文件事件 dispatch 派发给对应事件处理函数。如果同一个 socket,同时有读事件和写事件,Redis 派发器会首先派发处理读事件,然后再派发处理写事件。

3. 文件事件处理函数分类

Redis 中文件事件函数的注册和处理主要分为 3 种。
● 连接处理函数 acceptTcpHandler

Redis 在启动时,在 initServer 中对监听的 socket 注册读事件,事件处理器为 acceptTcpHandler,该函数在有新连接进入时,会被派发器派发读任务。在处理该读任务时,会 accept 新连接,获取调用方的 IP 及端口,并对新连接创建一个 client 结构。如果同时有大量连接同时进入,Redis 一次最多处理 1000 个连接请求。

● readQueryFromClient 请求处理函数

连接函数在创建 client 时,会对新连接 socket 注册一个读事件,该读事件的事件处理器就是 readQueryFromClient。在连接 socket 有请求命令到达时,IO 多路复用程序会获取并触发文件事件,然后这个读事件被派发器派发给本请求的处理函数。readQueryFromClient 会从连接 socket 读取数据,存入 client 的 query 缓冲,然后进行解析命令,按照 Redis 当前支持的 2 种请求格式,及 inline 内联格式和 multibulk 字符块数组格式进行尝试解析。解析完毕后,client 会根据请求命令从命令表中获取到对应的 redisCommand,如果对应 cmd 存在。则开始校验请求的参数,以及当前 server 的内存、磁盘及其他状态,完成校验后,然后真正开始执行 redisCommand 的处理函数,进行具体命令的执行,最后将执行结果作为响应写入 client 的写缓冲中。

● 命令回复处理器 sendReplyToClient

当 redis需要发送响应给client时,Redis 事件循环中会对client的连接socket注册写事件,这个写事件的处理函数就是sendReplyToClient。通过注册写事件,将 client 的socket与 AE_WRITABLE 进行间接关联。当 Client fd 可进行写操作时,就会触发写事件,该函数就会将写缓冲中的数据发送给调用方。

二、 时间事件处理

Redis 中的时间事件是指需要在特定时间执行的事件。多个 Redis 中的时间事件构成 aeEventLoop 中的一个链表,供 Redis 在 ae 事件循环中轮询执行。

Redis 当前的主要时间事件处理函数有 2 个:
● serverCron
● moduleTimerHandler

Redis 中的时间事件分为 2 类:
● 单次时间,即执行完毕后,该时间事件就结束了。
● 周期性事件,在事件执行完毕后,会继续设置下一次执行的事件,从而在时间到达后继续执行,并不断重复。

时间事件主要有 5 个属性组成。
● 事件 ID:Redis 为时间事件创建全局唯一 ID,该 ID 按从小到大的顺序进行递增。
● 执行时间 when_sec 和 when_ms:精确到毫秒,记录该事件的到达可执行时间。
● 时间事件处理器 timeProc:在时间事件到达时,Redis 会调用相应的 timeProc 处理事件。
● 关联数据 clientData:在调用 timeProc 时,需要使用该关联数据作为参数。
● 链表指针 prev 和 next:它用来将时间事件维护为双向链表,便于插入及查找所要执行的时间事件。

时间事件的处理是在事件循环中的 aeProcessEvents 中进行。执行过程是:

  1. 首先遍历所有的时间事件。
  2. 比较事件的时间和当前时间,找出可执行的时间事件。
  3. 然后执行时间事件的 timeProc 函数。
  4. 执行完毕后,对于周期性时间,设置时间新的执行时间;对于单次性时间,设置事件的 ID为 -1,后续在事件循环中,下一次执行 aeProcessEvents 的时候从链表中删除。

源码解析

1. 创建 eventLoop

我们从 Redis 的启动开始看起(server.c)(本节我们重点关注 ae 事件模型,其他代码先忽略掉)

void initServer(void) {

    ......

    // 处理 signal,这里的 signal 有两大类:
    // 1. kill 命令:通过 serverCron 优雅的关闭程序,而不是强制退出。
    // 2. 严重错误:主要是记录现场,关闭程序。
    setupSignalHandlers();

    ......

    // 创建事件循环
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    ......

    // 监听端口
    if (server.port != 0 &&
        listenToPort(server.port,&server.ipfd) == C_ERR) {
        serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
        exit(1);
    }

    ......

    // 创建 时间事件,处理后台任务 serverCron,比如客户端超时、淘汰过期数据等
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

    // 创建 文件事件处理函数,用于监听连接
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TLS socket accept handler.");
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

    /* Register a readable event for the pipe used to awake the event loop
     * when a blocked client in a module needs attention. */
    if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
        moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module "
                "blocked clients subsystem.");
    }

    ......
}

我们先看下 eventloop 的结构:

typedef struct aeEventLoop {

    /* 注册进 eventloop 的最大的 fd */
    int maxfd;

    /* 最多可以持有多少个文件描述符 */
    int setsize;

    long long timeEventNextId;

    /* 注册进 eventloop 的文件事件,数组,大小等于 setsize
    aeFileEvent *events;

    /* 当前已触发的事件 */
    aeFiredEvent *fired;

    // 链表,处理事件的时候,从表头开始遍历
    aeTimeEvent *timeEventHead;

    // eventloop 的运行状态
    int stop;

    /* This is used for polling API specific data */
    // 根据多路复用的 api 不同,具体结构有所不同,对于 linux 的 epoll,存放着 epfd 和 epoll_event
    void *apidata;

    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

这里我们看下 aeCreateEventLoop(ae.c)方法:

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    monotonicInit();    /* just in case the calling app didn't initialize */

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    // 时间事件链表头
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    // 停止标识符,表示 eventLoop 的运行状态,0为运行,1为停止
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    // 进入poll前的回调函数
    eventLoop->beforesleep = NULL;
    // 进入poll后的回调函数
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;

    // 根据选择的ae底层原理不同,aeApiCreate会有不同的实现;
    // 在Linux中使用epoll进行实现(ae_epoll.c),主要工作是 将 aeEventLoop->apiData
    // 指向 epfd 和 epoll_event
    if (aeApiCreate(eventLoop) == -1) goto err;

    ......
    return eventLoop;
}

aeApiCreate 针对不同的操作系统有不同的实现。比如 Unix 中是 ae_kqueue.c,这里我们看下 Linux 中的 ae_epoll.c,其中就调用了 epoll_create 方法

static int aeApiCreate(aeEventLoop *eventLoop) {
    ......
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    ......
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    ......
    return 0;
}

ae_epoll.c 中还有一些方法,都是对底层 epoll 的封装。事实上,不管那种 IO 复用方式,只要实现下面这些接口,就可以正常的对接 Redis 了

int aeApiCreate(aeEventLoop *eventLoop);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
void aeApiResize(aeEventLoop *eventLoop, int setsize);
void aeApiFree(aeEventLoop *eventLoop);
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask);
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
char *aeApiName(void);

创建完 eventLoop 之后,紧接着创建了监听器,用于监听 TCP 连接;createSocketAcceptHandler 这个方法内部实际上是:

  1. 调用了 epoll 的 epoll_ctl 方法,添加了一个 fd
  2. 定义了一个 acceptTcpHandler;用于处理连接请求:底层 epoll 将事件传递过来之后,用来连接用户 client

2. 启动 eventLoop

回到主线,在执行完 initServer 之后,会调用 ae.c 中的 aeMain 方法,开启事件循环:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

这里 aeProcessEvents 的第二个参数

AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP

并不是要处理的事件本身,而是表示要处理哪些类型的事件。如果当前空闲,暂时没有事情要处理,aeProcessEvents 方法会一直阻塞在 epoll_wait 上,所以这里的 while (!eventLoop->stop) 并不会导致 cpu 跑飞。

3. ae 事件模型的核心模块 - 处理事件

aeProcessEvents 的核心处理逻辑条件判断比较复杂,我感觉写的不是很好,可能它很管用,但是理解起来比较复杂,分支也相当乱,想了解具体步骤的话,需要去认真阅读源码。我在这里简要概括下基本逻辑:

  • 首先,扫一遍定时器链表(usUntilEarliestTimer),找到剩余时间最少的一个定时器,假设剩余时间是 x
  • 然后,调用 epoll_wait 阻塞住,epoll_wait 的超时时间设置为上面的 x 。
    • 如果上一步的链表中没有 时间事件,那么 x=-1,epoll_wait 会永久阻塞直到下一个 文件事件 到来
    • 如果遍历链表的时候,发现有已经过期的 时间事件,则 x=0,epoll_wait 立刻返回,直接去处理 时间事件
  • 在 x 时间内,如果没有文件事件,那么 epoll_wait 超时,去处理 时间事件(processTimeEvents)(如果 时间事件 是需要立刻执行的(AE_DONT_WAIT)那么超时时间设为 0,即立刻返回)
  • 在 x 时间内,如果有文件事件,那么 epoll_wait 返回,去处理 文件事件。处理完 文件事件 之后,再去搜索一遍 时间事件 链表。
/*
 * 一般情况下,先处理所有到期的 时间事件(time event),再处理所有文件事件(file event)(因为时间事件可能会触发新的文件事件)
 * 如果 flags 中没有特殊的标记位的话,这个方法会 sleep,直到 文件事件 触发,或者下一个 时间事件 到来。
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
 * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
 *
 * 方法返回处理完成的事件的数量
 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ......

    /* 只处理 时间事件 或 文件事件 */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* 这里就是 event loop 的核心逻辑 */
    /* Note that we want to call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {

        ......

        // 遍历时间事件链表,如果有 item,将超时时间放到 tvp 里
        struct timeval tv, *tvp;
        ......

        /* Before sleep callback. */
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* 调用 multiplexing API,这个方法里,调用了 epoll_wait,所以会阻塞住,
         * epoll_wait 超时时间是 tvp,如果 tvp 是空,则一直阻塞,直到下一个事件到来
         * will return only on timeout or when some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 遍历事件列表,依次处理事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* 一般情况下,我们可以很快地处理完读请求,所以这里优先处理 读事件。
             * 某些情况下,需要在 beforeSleep() hook 中处理事情的时候(比如需要把文件写回磁盘),会优先处理 写事件 */
            int invert = fe->mask & AE_BARRIER;

            /* 处理读事件 */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* 处理写事件 */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* 如果“读事件-写事件”被反转了,这时候再处理读事件 */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }

    /* 处理时间事件(time events);具体的处理器,在创建这个事件的时候就作为参数指定好了,即 long long aeCreateTimeEvent( ... aeTimeProc *proc ...) , redis 中比较典型的一个时间事件处理器,就是 evictionTimeProc */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* 返回处理完成的事件的数量 */
}

接下来我们看下 processTimeEvents 方法,它的主要工作是遍历链表,删除不再需要执行的定时器,执行已超时的定时器:

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    monotime now = getMonotonicUs();
    while(te) {

        /* 删除掉已经执行过的,且不再需要执行的定时器 */
        if (te->id == AE_DELETED_EVENT_ID) {
            ......
            // 调用 createTimeEvent 时候注册的 finalizer 方法,释放资源
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                now = getMonotonicUs();
            }
            zfree(te);
            te = next;
            continue;
        }

        /* 这段注释有点意思,大家可以仔细读一读,体会下作者的良苦用心
         * Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        // 真正的处理逻辑,调用创建 timeEvent 时候注册的 handler
        if (te->when <= now) {
            int retval;

            id = te->id;
            te->refcount++;
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;
            now = getMonotonicUs();

            // 定时器内容处理完后,会返回一个 int 值,表示下次执行需要等待的时间(秒)
            // 如果是 AE_NOMORE=-1,则表示不需要在执行了,下个 eventloop 会从链表删除掉该节点
            if (retval != AE_NOMORE) {
                te->when = now + retval * 1000;
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

其中一段注释非常有意思,"we keep it here for future defense" 这应该是一个优秀的工程师应该具备的基本素养之一。

至此,Redis 的 ae 事件模型已经基本学习完毕了。其他部分的代码,“线程模型&网络框架” 不再做讨论。

参考

https://www.yuque.com/jifeng-bvojj/elllo4/cfgwo7

标签: redis
最后更新:2026年3月13日

zt52875287

这个人很懒,什么都没留下

点赞
< 上一篇
下一篇 >
文章目录
  • 事件驱动模型
  • 一、文件事件处理
    • 1. IO多路复用
    • 2. 文件事件收集及派发器
    • 3. 文件事件处理函数分类
  • 二、 时间事件处理
  • 源码解析
    • 1. 创建 eventLoop
    • 2. 启动 eventLoop
    • 3. ae 事件模型的核心模块 - 处理事件
  • 参考

Copyright © by zt52875287@gmail.com All Rights Reserved.

Theme Kratos Made By Seaton Jiang

陕ICP备2021009385号-1