接下来,我们分析下redis中事件的处理逻辑。
在函数initServer中调用aeCreateEventLoop完成初始化后,在main函数中调用ae_main,该函数是一个死循环:
static void initServer() {
---
server.el = aeCreateEventLoop();
---
}
int main(int argc, char **argv) {
---
initServer();
---
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
---
}
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
尽管aeMain函数有退出条件,但除了基准测试中会调用aeStop修改该值,该条件不会被改变。
aeMain在处理event之前,先调用beforeSleep,该函数先处理已ready的client,然后刷新aof缓冲区(aof机制后续章节会详细分析):
static void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
/* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
listNode *ln;
listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
redisClient *c = ln->value;
struct redisCommand *cmd;
/* Resume the client. */
listDelNode(server.io_ready_clients,ln);
c->flags &= (~REDIS_IO_WAIT);
server.vm_blocked_clients--;
aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c);
cmd = lookupCommand(c->argv[0]->ptr);
assert(cmd != NULL);
call(c,cmd);
resetClient(c);
/* There may be more data to process in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
}
/* Write the AOF buffer on disk */
flushAppendOnlyFile();
}
aeMain调用aeProcessEvents处理文件事件和timer事件。aeProcessEvents 先获得最先超时的timer,并记下该timer距此时的时间段,将该时间段作为aeApiPoll的超时时间(以能尽快调用timer处理,因为是先处理file事件,后处理timer事件),aeApiPoll返回后将调用注册的read、write函数进行读写:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to se the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// tvp为最近的一个timer
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
接着,aeProcessEvents调用 processTimeEvents处理timer事件(此时至少有一个超时),processTimeEvents循环处理已超时的timer。注意,processTimeEvent并不一定会删除超时的timer,代码如下:
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
// 中间注册的id必然比maxid大
while(te) {
long now_sec, now_ms;
long long id;
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
---
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
aeDeleteTimeEvent(eventLoop, id);
}
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
当timer超时时,会调用timer创建时注册的timeProc,根据timerProc的返回值,是删除还是继续修改超时时间。注意,redis的主要循环处理函数serverCron就是靠这种定时机制得以反复运行的,该定时处理函数就一直返回100,这样就使得redis每隔100ms执行一次serverCron函数。
因此,redis的主要循环逻辑为一开始使用beforeSleep处理ready的client,然后处理相关的文件event,最后调用serverCron做一些工作。
下面一节分析下serverCron所做的工作。
你好,请问:“中间注册的id必然比maxid大”这句话如何理解?eventLoop->timeEventNextId不是每次注册一个timeEvent事件就加1吗?为何这里还会有te->id大于maxId?
呃 是我没看仔细。现在明白了,之前没考虑到te->timeProc(eventLoop, id, te->clientData)可能会注册timeEvent事件。
嗯~~ 是这样的~~
请教下,redis的实现的框架中,定时器事件还是有死循环的可能啊,假设有两个事件A和B,A是链表头,A每100ms执行一次,B执行需要150ms。
那么首先执行A,A执行完后,设置下次超时时间,然后从A开始执行,此时A还没超时跳过执行,执行B,B执行完后又从头开始执行,此时A已经超时,执行A,这时不是死循环了吗
不知道我的理解对不对,A循环执行并且该循环时间比较短(就是你说的100ms),而且执行很慢(你插入那个B好像也就是这个意思吧,其实这个无所谓,可以还有B、C、D,无非就是模拟中间还有其他函数费了不少时间),导致的结果就是A超时后处理然后继续超时,继续调用A的超时函数导致循环,因为redis在处理完一个超时事件后会返回到链表头重新检查。
这个确实形成了死循环。但这没有逻辑问题。从要求上讲,A要求定时执行,既然超时了(每次返回到链表头检查时),当然要执行罗。。。
只能说redis不适合这种场景。redis适合于IO密集型,不适合计算密集型。问题中由于A、又有B,导致B处理完A就超时了,典型的计算密集型,CPU根本就没有空出来啊~ 网络操作是IO密集型,redis才有用武之地。另一方面,记得redis整个源码中也只添加了serverCron一个定时器,而且这个函数的执行应该需要严格控制,比如那个redis中的db数就不应太多,so其实那个定时器链表其实没什么用,也难怪查找时作者说没有优化的必要了。。。
想起了之前别人问过我的一个问题,也是关于这个time链表。这个链表是无序的,如果链表上有大量的超时time,按照这个顺序处理逻辑就可能不对了。因为按理说至少要排个序嘛,超时也得有个先后。。。当时很囧,没有代码在手边,没回答上这样做在redis中会不会存在问题。道理其实很简单,因为redis中暂时就一个time嘛。。。当然,如果要迁移这个框架为你所用,是需要改改的。。。比如可以用跳表、堆什么的优化一下。。。。
你好,REDIS_NOTUSED(eventLoop),如何理解
您好,请问redis中的epoll模型没有采用更为高效的ET模式有什么原因吗?
可以参考下zhihu上的讨论:
http://www.zhihu.com/question/20502870