这一节主要介绍下val的加载。
对于某些命令,比如get somekey,当运行到processCommand时可能key对应的val不在内存中。在运行命令绑定的处理函数之前,redis会提前加载其val。
在 processCommand中,在vm开启并启用多线程时,会调用 blockClientOnSwappedKeys来加载可能已swap的val,如果blockClientOnSwappedKeys返回0,说明有 swap的val没被加载,则返回不调用call了(此时client会设置 REDIS_IO_WAIT标志,并已放到等待列表中)。代码如下:
static int processCommand(redisClient *c) {
---
if (server.vm_enabled && server.vm_max_threads > 0 &&
blockClientOnSwappedKeys(c,cmd)) return 1;
call(c,cmd);
---
}
在blockClientOnSwappedKeys函数中,如果命令设置了预加载函数,比如zunionstore和zinterstore就设置了预加载函数 zunionInterBlockClientOnSwappedKeys,则使用设置的预加载函数加载swap的val,否则使用 waitForMultipleSwappedKeys加载swap的val,不过查看 zunionInterBlockClientOnSwappedKeys和waitForMultipleSwappedKeys的实现就可以发现, 这些函数最终都调用waitForSwappedKey。在预加载函数返回后,若client的io_keys链表非空(io_keys是该client 不在内存中的val的链表),则设置client的REDIS_IO_WAIT标志,取消client的read 事件(在这之前,client已放到对应key的阻塞队列中了)。
static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
if (cmd->vm_preload_proc != NULL) {
cmd->vm_preload_proc(c,cmd,c->argc,c->argv);
} else {
waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv);
}
/* If the client was blocked for at least one key, mark it as blocked. */
if (listLength(c->io_keys)) {
c->flags |= REDIS_IO_WAIT;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
server.vm_blocked_clients++;
return 1;
} else {
return 0;
}
}
我们看看waitForMultipleSwappedKeys的实现。waitForMultipleSwappedKeys会根据命令字表中设置的预加载参数,加载需要加载的val。
static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
int j, last;
if (cmd->vm_firstkey == 0) return;
last = cmd->vm_lastkey;
if (last < 0) last = argc+last;
for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) {
redisAssert(j < argc);
waitForSwappedKey(c,argv[j]);
}
}
比如我们查看get命令字的设置。后面的1,1,1就是表示加载的val在argv中的位置,每个get命令最多需要预加载1个val。
{"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
waitForSwappedKey 涉及到vm的多线程,建议先粗略理解下,并在阅读vm章节后再返回此处阅读。该函数所做的主要工作就是将(c, key) 加到c->db->io_keys中,而db其实指向全局server的db,然后创建一个job,插入到工作线程中,让工作线程完成val 的加载。
static int waitForSwappedKey(redisClient *c, robj *key) {
struct dictEntry *de;
robj *o;
list *l;
/* If the key does not exist or is already in RAM we don't need to
* block the client at all. */
de = dictFind(c->db->dict,key);
if (de == NULL) return 0;
o = dictGetEntryKey(de);
if (o->storage == REDIS_VM_MEMORY) {
return 0;
} else if (o->storage == REDIS_VM_SWAPPING) {
/* We were swapping the key, undo it! */
vmCancelThreadedIOJob(o);
return 0;
}
/* OK: the key is either swapped, or being loaded just now. */
/* Add the key to the list of keys this client is waiting for.
* This maps clients to keys they are waiting for. */
listAddNodeTail(c->io_keys,key);
incrRefCount(key);
/* Add the client to the swapped keys => clients waiting map. */
de = dictFind(c->db->io_keys,key);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->io_keys,key,l);
incrRefCount(key);
assert(retval == DICT_OK);
} else {
l = dictGetEntryVal(de);
}
listAddNodeTail(l,c);
/* Are we already loading the key from disk? If not create a job */
if (o->storage == REDIS_VM_SWAPPED) {
iojob *j;
o->storage = REDIS_VM_LOADING;
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_LOAD;
j->db = c->db;
j->key = o;
j->key->vtype = o->vtype;
j->page = o->vm.page;
j->val = NULL;
j->canceled = 0;
j->thread = (pthread_t) -1;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
return 1;
}
插入工作线程的job在运行完后,会调用 vmThreadedIOCompletedJob,在该函数中会调用handleClientsBlockedOnSwappedKey处理阻塞的 client,而handleClientsBlockedOnSwappedKey所做的主要工作就是将所有val已加载的client放到 server.io_ready_clients中,此时client已ready好了,但还没有加入read事件循环(因为之前因为等待val已删除其 read事件)。
static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
struct dictEntry *de;
list *l;
listNode *ln;
int len;
de = dictFind(db->io_keys,key);
if (!de) return;
l = dictGetEntryVal(de);
len = listLength(l);
/* Note: we can't use something like while(listLength(l)) as the list
* can be freed by the calling function when we remove the last element. */
while (len--) {
ln = listFirst(l);
redisClient *c = ln->value;
if (dontWaitForSwappedKey(c,key)) {
/* Put the client in the list of clients ready to go as we
* loaded all the keys about it. */
listAddNodeTail(server.io_ready_clients,c);
}
}
}
最后还剩下一个问题,那就是处于server.io_ready_clients的clint会在什么时候增加read事件,从而继续让其接收客户端的输入了。这个工作在beforeSleep函数中完成(前面的事件循环中有详细介绍)。beforeSleep会为server.io_ready_clients中的client增加read事件,调用processInputBuffer处理其输入。
另外注意,如果client需要的val在检查时都在内存中,但当执行命令处理函数时,该val被swap出去了,则只能使用vmLoadObject直接加载了(阻塞方式)。对此种情况,redis的解释是In practical terms this should onlyhappen with SORT BY command or if there is a bug in this function(参考blockClientOnSwappedKeys前的注释)。
Pingback 引用通告: redis源代码分析15–val加载机制 | Linux C++ 中文网