来源: Redis的事件机制 – lijihong0723 – 博客园
Redis程序的运行过程是一个处理事件的过程,也称Redis是一个事件驱动的服务。Redis中的事件分两类:文件事件(File Event)、时间事件(Time Event)。文件事件处理文件的读写操作,特别是与客户端通信的Socket文件描述符的读写操作;时间事件主要用于处理一些定时处理的任务。
本文首先介绍Redis的运行过程,阐明Redis程序是一个事件驱动的程序;接着介绍事件机制实现中涉及的数据结构以及事件的注册;最后介绍了处理客户端中涉及到的套接字文件读写事件。
一、Redis的运行过程
Redis的运行过程是一个事件处理的过程,可以通过下图反映出来:
图1 Redis的事件处理过程
从上图可以看出:Redis服务器的运行过程就是循环等待并处理事件的过程。通过时间事件将运行事件分成一个个的时间分片,如图1的右半部分所示。如果在指定的时间分片中,有文件事件发生,如:读文件描述符可读、写文件描述符可写,则调用相应的处理函数进行文件的读写处理。文件事件处理完成之后,处理期望发生时间在当前时间之前或正好是当前时刻的时间事件。然后再进入下一次循环迭代处理。
如果在指定的事件间隔中,没有文件事件发生,则不需要处理,直接进行时间事件的处理,如下图所示。
图2 Redis的事件处理过程(无文件事件发生)
二、事件数据结构
2.1 文件事件数据结构
Redis用如下结构体来记录一个文件事件:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
通过mask来描述发生了什么事件:
- AE_READABLE:文件描述符可读;
- AE_WRITABLE:文件描述符可写;
- AE_BARRIER:文件描述符阻塞
rfileProc和wfileProc分别为读事件和写事件发生时的回调函数,其函数签名如下:
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
2.2 事件事件数据结构
Redis用如下结构体来记录一个时间事件:
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
when_sec和when_ms指定时间事件发生的时间,timeProc为时间事件发生时的处理函数,签名如下:
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
prev和next表明时间事件构成了一个双向链表。
3.3 事件循环
Redis用如下结构体来记录系统中注册的事件及其状态:
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
这一结构体中,最主要的就是文件事件指针events和时间事件头指针timeEventHead。文件事件指针event指向一个固定大小(可配置)数组,通过文件描述符作为下标,可以获取文件对应的事件对象。
三、事件的注册过程
事件驱动的程序实际上就是在事件发生时,调用相应的处理函数(即:回调函数)进行逻辑处理。因此关于事件,程序需要知道:①事件的发生;② 回调函数。事件的注册过程就是告诉程序这两。下面我们分别从文件事件、时间事件的注册过程进行阐述。
3.1 文件事件的注册过程
对于文件事件:
- 事件的发生:应用程序需要知道哪些文件描述符发生了哪些事件。感知文件描述符上有事件发生是由操作系统的职责,应用程序需要告诉操作系统,它关心哪些文件描述符的哪些事件,这样通过相应的系统API就会返回发生了事件的文件描述符。
- 回调函数:应用程序知道了文件描述符发生了事件之后,需要调用相应回调函数进行处理,因而需要在事件发生之前将相应的回调函数准备好。
这就是文件事件的注册过程,函数的实现如下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
这段代码逻辑非常清晰:首先根据文件描述符获得文件事件对象,接着在操作系统中添加自己关心的文件描述符(addApiAddEvent
),最后将回调函数记录到文件事件对象中。因此,一个线程就可以同时监听多个文件事件,这就是IO多路复用。操作系统提供多种IO多路复用模型,如:Select模型、Poll模型、EPOLL模型等。Redis支持所有这些模型,用户可以根据需要进行选择。不同的模型,向操作系统添加文件描述符方式也不同,Redis将这部分逻辑封装在aeApiAddEvent
中,下面代码是EPOLL模型的实现:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
这段代码就是对操作系统调用epoll_ctl()
的封装,EPOLLIN
对应的是读(输入)事件,EPOLLOUT对应的是写(输出)事件。
3.2 时间事件的注册过程
对于时间事件:
- 事件的发生:当前时刻正好是事件期望发生的时刻,或者是晚于事件期望发生的时刻,所以需要让程序知道事件期望发生的时刻;
- 回调函数:此时调用回调函数进行处理,所以需要让程序知道事件的回调函数。
对应的事件事件注册函数如下:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
这段代码逻辑也是非常简单:首先创建时间事件对象,接着设置事件,设置回调函数,最后将事件事件对象插入到时间事件链表中。设置时间事件期望发生的时间比较简单:
static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
long cur_sec, cur_ms, when_sec, when_ms;
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds/1000;
when_ms = cur_ms + milliseconds%1000;
if (when_ms >= 1000) {
when_sec ++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
static void aeGetTime(long *seconds, long *milliseconds)
{
struct timeval tv;
gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec/1000;
}
当前时间加上期望的时间间隔,作为事件期望发生的时刻。
四、套接字文件事件
Redis为客户端提供存储数据和获取数据的缓存服务,监听并处理来自请求,将结果返回给客户端,这一过程将会发生以下文件事件:
与上图相对应,对于一个请求,Redis会注册三个文件事件:
4.1 TCP连接建立事件
服务器初始化时,在服务器套接字上注册TCP连接建立的事件。
void initServer(void) {
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
}
回调函数为acceptTcpHandler,该函数最重要的职责是创建客户端结构。
4.2 客户端套接字读事件
创建客户端:在客户端套接字上注册客户端套接字可读事件。
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
回调函数为readQueryFromClient,顾名思义,此函数将从客户端套接字中读取数据。
4.3 向客户端返回数据
Redis完成请求后,Redis并非处理完一个请求后就注册一个写文件事件,然后事件回调函数中往客户端写回结果。根据图1,检测到文件事件发生后,Redis对这些文件事件进行处理,即:调用rReadProc或writeProc回调函数。处理完成后,对于需要向客户端写回的数据,先缓存到内存中:
typedef struct client {
// ...其他字段
list *reply; /* List of reply objects to send to the client. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
};
发送给客户端的数据会存放到两个地方:
- reply指针存放待发送的对象;
- buf中存放待返回的数据,bufpos指示数据中的最后一个字节所在位置。
这里遵循一个原则:只要能存放在buf中,就尽量存入buf字节数组中,如果buf存不下了,才存放在reply对象数组中。
写回发生在进入下一次等待文件事件之前,见图1中【等待前处理】,会调用以下函数来处理客户端数据写回逻辑:
int writeToClient(int fd, client *c, int handler_installed) {
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = o->used;
if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
}
}
}
上述函数只截取了数据发送部分,首先发送buf
中的数据,然后发送reply
中的数据。
有读者可能会疑惑:write()系统调用是阻塞式的接口,上述做法会不会在write()调用的地方有等待,从而导致性能低下?这里就要介绍Redis是怎么处理这个问题的。
首先,我们发现创建客户端的代码:
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
}
}
可以看到设置fd是非阻塞(NonBlock),这就保证了在套接字fd上的read()和write()系统调用不是阻塞的。
其次,和文件事件的处理操作一样,往客户端写数据的操作也是批量的,函数如下:
int handleClientsWithPendingWrites(void) {
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
}
可以看到,首先对每个客户端调用刚才介绍的writeToClient()
函数进行写数据,如果还有数据没写完,那么注册写事件,当套接字文件描述符写就绪时,调用sendReplyToClient()
进行剩余数据的写操作:
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
writeToClient(fd,privdata,1);
}
仔细想一下就明白了:处理完得到结果后,这时套接字的写缓冲区一般是空的,因此write()函数调用成功,所以就不需要注册写文件事件了。如果写缓冲区满了,还有数据没写完,此时再注册写文件事件。并且在数据写完后,将写事件删除:
int writeToClient(int fd, client *c, int handler_installed) {
if (!clientHasPendingReplies(c)) {
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
}
注意到,在sendReplyToClient()函数实现中,第三个参数正好是1。