Redis源码剖析搁浅了一段时间,由于自己对事件驱动以及Reactor模式的理解不够深,源码看起来比较吃力,思来想去,所幸自己去实现一个简单的事件驱动模型。于是,采用python的select和queue模块开发了一个简易聊天服务器,实践中学习到的东西很多,回头再来看Redis的ae事件源码,明显轻松多了。

事件概述

Redis采用Reactor模式来处理事件,简单点来说,Redis会将所有待处理的事件放入事件池,然后挨个取出其中的事件,调用该事件的回调函数对其进行处理,这样的模式就称为Reactor模式,其处理事件的态度就是:「不要来找我,安静的取号排队,我会依次叫号的」。

Redis的事件分为文件事件和时间事件两种。其中,文件事件采用I/O多路复用技术,监听感兴趣的I/O事件,例如读、写事件等,当事件的描述符变为可读或可写时,就将该事件放入待处理事件池,Redis在事件循环的时候会对其进行一一处理。

而时间事件则是维护一个定时器,每当满足预设的时间要求,就将该时间事件标记为待处理,然后在Redis的事件循环中进行处理。Redis对于这两种事件的处理优先级是:文件事件优先于时间事件。

事件数据结构

在上述的概述中,我们可以获得几个关键池:文件事件,时间事件,待处理事件池和事件循环,Redis为这几个关键词都定义了各自的结构体。下面,依次来看一看:

/* 文件事件结构体 */
typedef struct aeFileEvent {
    int mask; // 事件标记,读or写
    aeFileProc *rfileProc;  // 读事件处理函数
    aeFileProc *wfileProc;  // 写事件处理函数
    void *clientData;  // 事件中包含的待处理数据
} aeFileEvent;

很明显,文件事件在处理过程中,需要根据mask来判断调用读或者写函数来处理该事件,这个很好理解。

/* 时间事件结构体 */
typedef struct aeTimeEvent {
    long long id; // 时间事件标识符,用于唯一标记该时间事件
    long when_sec; // 时间事件触发时间 秒
    long when_ms; // 时间事件触发时间 微秒
    aeTimeProc *timeProc;  // 该事件对应的处理函数
    aeEventFinalizerProc *finalizerProc;   // 时间事件的最后一次处理程序,若已设置,则删除该事件的时候会调用
    void *clientData; // 数据
    struct aeTimeEvent *next; // 下一个时间事件
} aeTimeEvent;

在上述结构体中,该时间事件的标识符包括,时间事件id和触发时间when,每次时间循环的时候判断是否到了触发时间,如到了,则调用对应的处理函数进行处理,该时间事件中还定义了该时间事件最后一次的处理函数。另外,next指针指向下一个时间事件,有些时间事件比如隔多少秒执行一次的,这时候next指针就派上用场了。

/* 触发时间结构体 */
typedef struct aeFiredEvent {
    int fd;  // 文件事件描述符
    int mask;  // 读写标记
} aeFiredEvent;

每次调用I/O复用程序之后,会返回已经准备好的文件事件描述符,这时候就会以该结构体的形式存放下来。Redis在事件循环的时候,会对这些已准备好待处理的事件一一进行处理,也就是上面说的待处理事件池。

/* 事件循环结构体 */
typedef struct aeEventLoop {
    int maxfd;   // 当前注册的最大描述符
    int setsize; // 需要监听的描述符个数
    long long timeEventNextId; // 下一个时间事件的id,用于生成时间事件的唯一标识
    time_t lastTime;     // 上一次事件循环的时间,用于检测系统时间是否变更
    aeFileEvent *events; // 注册要使用的文件事件
    aeFiredEvent *fired; // 已准备好,待处理的时间
    aeTimeEvent *timeEventHead; // 时间事件头,因为事件时间其实是一个链表
    int stop; //  停止标识,1表示停止
    void *apidata; // 用于处理底层特定的API数据,对于epoll来说,其包括epoll_fd和epoll_event
    aeBeforeSleepProc *beforesleep; // 没有待处理事件时调用
} aeEventLoop;

时间循坏结构中,存放着所有文件事件和时间事件,以及已准备好的事件,还有以下标识参数等。这些使得事件循环能够安全,高效的运行。

事件创建和删除

我们知道,Redis的事件分为文件事件和时间事件,上述也给出了其分别对应的结构体。首先,我们去看看如何创建和删除事件。

/* 创建文件事件 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
/* 删除文件事件 */
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
/* 根据文件描述符获取文件事件 */
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
/* 创建时间事件 */
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
/* 删除时间事件 */
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

这些函数都非常简单,就是对其结构体进行一些初始化操作,我们主要来了解一下在什么时候创建事件,继续跟踪代码的话,这些就一目了然。

关于文件事件,有两个地方需要创建它:

  • 初始化服务器的时候,需要监听新的客户连接
/* 截取至server.c/initServer函数 */
/* 服务器可能有多个文件描述符,根据每个文件描述符来创建监听事件,监听新客户的连接 */
for (j = 0; j < server.ipfd_count; j++) { 
	if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
		acceptTcpHandler,NULL) == AE_ERR)
	{
		serverPanic("Unrecoverable error creating server.ipfd file event.");
	}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
    acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
  • 在客户连接服务器之后,需要监听该客户的读写事件
/* 代码截取至netWorking.c/createClient函数 */
/* 新用户连接的时候需要创建client结构体,此时就需要创建文件事件,用来监听其读写操作 */
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
{
  close(fd);
  zfree(c);
  return NULL;
}

关于时间事件,Redis在初始化服务器的时候就会创建,前面我们提到的AOF和RDB持久化以及过期键的处理等操作中,都设计到定时操作,时间事件就是为了这些定时操作而设定的,在特定的时间触发时间事件,并进行相应的处理。

/* 截取至server.c/initServer函数 */
/* 创建时间事件,用于处理定时任务 */
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }

I/O多路复用

在进行文件事件处理前,需要调用I/O多路复用来获得已经准备就绪的文件描述符,关于I/O多路复用,有以下四种函数可以使用。

  • select
  • evport(libevent)
  • epoll
  • kqueue

其中,不同的系统支持不同的多路复用函数,linux下可以使用select和epoll,macos下可以使用select和kqueue,如果安装有libevent,则可以使用evport。对于I/O多路复用,不懂的可以参考我的这篇学习笔记:UNIX网络编程之五:select和poll函数

Redis为了适用于不同的操作系统,同时也要达到最高的效率,定义了如下库选择宏,用于根据当前操作系统的环境来选择最高效的I/O多路复用函数。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

另外,为了统一的使用这些I/O复用函数,Redis对其进行了特定的封装,使得对外的接口得到了统一,操作起来也很方便。

/* 添加需要监听的事件 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
/* 初始化I/O多路复用库所需的参数 */
static int aeApiCreate(aeEventLoop *eventLoop)
/* 删除不需要监听的事件 */
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
/* 清空 */
static void aeApiFree(aeEventLoop *eventLoop)
/* 返回当前使用的库的名字 */
static int aeApiName(void)
/* 取出已准备好的文件描述符 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

我们以select为例,其可以监听读或写类型的文件描述符,所以,需要定义一个结构体来存放需要监听的读写类型的文件描述符集合。

typedef struct aeApiState {
    fd_set rfds, wfds;  // 读写文件描述符集合
    /* We need to have a copy of the fd sets as it's not safe to reuse
     * FD sets after select(). */
    fd_set _rfds, _wfds;
} aeApiState;

在eventloop结构体中,apidata字段保存的数据类型就是aeApiState。增加,删除,清空等操作都是对这两个文件描述符集进行操作,没什么好讲的,主要是aeApiPoll函数,其调用底层的I/O复用函数,获取已经准备好的描述符集。

/* 调用select函数,获取已经准备好的描述符集 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, j, numevents = 0;

    memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
    memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
	// 调用select函数
    retval = select(eventLoop->maxfd+1,
                &state->_rfds,&state->_wfds,NULL,tvp);
    if (retval > 0) {
        for (j = 0; j <= eventLoop->maxfd; j++) {
            int mask = 0;
            // 找到该描述符对应的事件
          	aeFileEvent *fe = &eventLoop->events[j];
			// 根据mask判断读或写事件,并在需要监听的描述符中确定存在该描述符
            if (fe->mask == AE_NONE) continue;
            if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                mask |= AE_WRITABLE;
            // 加入到已经准备好的事件中,等待处理
            eventLoop->fired[numevents].fd = j;
            eventLoop->fired[numevents].mask = mask;
            numevents++;
        }
    }
    // 返回已经准备好的事件个数
    return numevents;
}

简单点来说,I/O多路复用函数的作用就是,将给定的需要监听的描述符集作为输入,该函数轮询这些描述符集,然后找出其中已经准备好的文件描述符集。

事件循环

上面调用I/O多路复用程序已经获取了准备好的事件,接下来就需要对这些事件进行处理。在server.c文件里,Redis的入口函数main函数中,就调用了aeMain()函数,用于执行事件循环。

/* 事件循环主函数 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;  // 开启事件循环
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);  // 事件处理函数
    }
}

我们可以看到,基本上事件循环自服务器运行开始,就一直不停的执行,不停地处理各类事件。关于事件处理的具体函数如下:

/* 事件处理函数 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    // 没有需要处理的事件就直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    // 即使没有要处理的文件事件,只要我们想处理时间事件就需要调用select()函数
    // 这是为了睡眠直到下一个时间事件准备好。
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
		// 获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 运行到这里说明时间事件存在,则根据最近可执行时间事件和现在的时间的时间差
            // 来决定文件事件的阻塞事件
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            // 计算下一次时间事件准备好的时间
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                // 时间差小于0,代表可以处理了
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            // 执行到这里,说明没有待处理的时间事件
            // 此时根据AE_DONT_WAIT参数来决定是否设置阻塞和阻塞的时间
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
		// 调用I/O复用函数获取已准备好的事件
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 从已就绪事件中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

            // 如果为读事件则调用读事件处理函数
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 如果为写事件则调用写事件处理函数
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    // 处理时间事件,记住,此处说明Redis的文件事件优先于时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
	// 返回处理的事件个数
    return processed;
}

简而言之,事件的处理步骤就是:

  • 查找最早的时间事件,判断是否需要执行,如需要,就标记下来,等待处理
  • 获取已准备好的文件事件描述符集
  • 优先处理读事件
  • 处理写事件
  • 如有时间事件,就处理时间事件

事件处理函数

前面几小节中,分析了创建事件,事件循环,那么对于待处理的事件,Redis对这些事件都进行哪些处理呢?

时间事件处理

在创建时间事件的函数中,传入了时间事件的真正处理函数serverCron,该函数在之前的源码分析中已多次提到,由于源码过长,这里只简要总结一下该函数中执行哪些操作。

  • 更新服务器的各类统计信息,比如时间、内存占用、数据库占用等
  • 清理数据库中的过期键值对
  • 关闭和清理连接失效的客户端
  • 尝试进行AOF和RDB持久化操作
  • 如果是主服务器,就对从服务器进行定期同步
  • 如果是集群模式,对集群进行定期同步和连接测试

文件事件处理

在注册文件事件的时候,给定的文件处理函数是acceptTcpHandlerreadQueryFromClient。我们先来看看前者。

很显然,前者对应的事件是为了监听新客户的连接,其事件处理应该是:处理客户的连接,并创建新客户结构体,初始化等

/* 监听新客户连接事件的处理函数*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
	// TCP连接,接受客户的连接
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        // 连接出错
      	if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        // 记录日志
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 初始化客户操作
      	acceptCommonHandler(cfd,0,cip);
    }
}

上述函数只是接受了该客户的连接,真正创建客户结构体的操作由acceptCommonHandler函数完成。该函数的源码就不列出来了,这里主要说一下它的主要工作:

  • 创建redisClient结构体,并建立和保存服务器和客户端的连接信息
  • 为该客户端的套接字注册读事件,相应的回调函数为readQueryFromClient(这个在下面介绍)

后者则是创建了客户端结构体之后,需要监听客户端的读操作的事件处理函数,用于读取客户端穿来的命令,并执行相关操作,再回复给客户端。

所以,针对文件事件的处理就是:监听新客户的连接,如连接就创建新客户,然后继续监听该客户的读事件,如客户发送命令,就解析命令,执行相应的处理,然后回复客户端。

小结

本篇博客分析了事件的创建,循环和处理过程,到此,结合以前的分析,我们基本上可以理解客户端传来命令之后,服务器进行相应处理,然后回复客户端,这整个大循环过程了。对于时间事件和文件事件,想用下面的比特图来总结一下。

time ------------------------------------------------------------------------------->
    |<------10ms------->|<------10ms------->|<------10ms------->|<------10ms------->|
    | fileEvent1  | fileEvent2 |   block   | fileEvent3  | fileEvent4 |   block   |
    |                          | timeEvent1|                          | timeEvent2|

这张图告诉我们,在时间事件执行的时候,文件事件是处于阻塞状态的,所以,再一次印证了Redis是单线程的,整个事件循环是串行的。

由于本人是边看源码边写剖析,所以很多没有注意到或者理解有误的地方,请您在留言区指出,也希望各位学习Redis的同学能联系我,一起探讨和学习Redis!

张程,于湖北·武汉