redis源代码分析15–val加载机制

这一节主要介绍下val的加载。

对于某些命令,比如get somekey,当运行到processCommand时可能key对应的val不在内存中。在运行命令绑定的处理函数之前,redis会提前加载其val。

在 processCommand中,在vm开启并启用多线程时,会调用 blockClientOnSwappedKeys来加载可能已swap的val,如果blockClientOnSwappedKeys返回0,说明有 swap的val没被加载,则返回不调用call了(此时client会设置 REDIS_IO_WAIT标志,并已放到等待列表中)。代码如下:

static int processCommand(redisClient *c) {
     ---
     if (server.vm_enabled && server.vm_max_threads > 0 &&
            blockClientOnSwappedKeys(c,cmd)) return 1;
        call(c,cmd);
      ---
}

在blockClientOnSwappedKeys函数中,如果命令设置了预加载函数,比如zunionstore和zinterstore就设置了预加载函数 zunionInterBlockClientOnSwappedKeys,则使用设置的预加载函数加载swap的val,否则使用 waitForMultipleSwappedKeys加载swap的val,不过查看 zunionInterBlockClientOnSwappedKeys和waitForMultipleSwappedKeys的实现就可以发现, 这些函数最终都调用waitForSwappedKey。在预加载函数返回后,若client的io_keys链表非空(io_keys是该client 不在内存中的val的链表),则设置client的REDIS_IO_WAIT标志,取消client的read 事件(在这之前,client已放到对应key的阻塞队列中了)。

static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
    if (cmd->vm_preload_proc != NULL) {
        cmd->vm_preload_proc(c,cmd,c->argc,c->argv);
    } else {
        waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv);
    }

    /* If the client was blocked for at least one key, mark it as blocked. */
    if (listLength(c->io_keys)) {
        c->flags |= REDIS_IO_WAIT;
        aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
        server.vm_blocked_clients++;
        return 1;
    } else {
        return 0;
    }
}

我们看看waitForMultipleSwappedKeys的实现。waitForMultipleSwappedKeys会根据命令字表中设置的预加载参数,加载需要加载的val。

static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) {
    int j, last;
    if (cmd->vm_firstkey == 0) return;
    last = cmd->vm_lastkey;
    if (last < 0) last = argc+last;
    for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) {
        redisAssert(j < argc);
        waitForSwappedKey(c,argv[j]);
    }
}

比如我们查看get命令字的设置。后面的1,1,1就是表示加载的val在argv中的位置,每个get命令最多需要预加载1个val。

{"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},

waitForSwappedKey 涉及到vm的多线程,建议先粗略理解下,并在阅读vm章节后再返回此处阅读。该函数所做的主要工作就是将(c, key) 加到c->db->io_keys中,而db其实指向全局server的db,然后创建一个job,插入到工作线程中,让工作线程完成val 的加载。

static int waitForSwappedKey(redisClient *c, robj *key) {
    struct dictEntry *de;
    robj *o;
    list *l;

    /* If the key does not exist or is already in RAM we don't need to
     * block the client at all. */
    de = dictFind(c->db->dict,key);
    if (de == NULL) return 0;
    o = dictGetEntryKey(de);
    if (o->storage == REDIS_VM_MEMORY) {
        return 0;
    } else if (o->storage == REDIS_VM_SWAPPING) {
        /* We were swapping the key, undo it! */
        vmCancelThreadedIOJob(o);
        return 0;
    }

    /* OK: the key is either swapped, or being loaded just now. */

    /* Add the key to the list of keys this client is waiting for.
     * This maps clients to keys they are waiting for. */
    listAddNodeTail(c->io_keys,key);
    incrRefCount(key);

    /* Add the client to the swapped keys => clients waiting map. */
    de = dictFind(c->db->io_keys,key);
    if (de == NULL) {
        int retval;

        /* For every key we take a list of clients blocked for it */
        l = listCreate();
        retval = dictAdd(c->db->io_keys,key,l);
        incrRefCount(key);
        assert(retval == DICT_OK);
    } else {
        l = dictGetEntryVal(de);
    }
    listAddNodeTail(l,c);

    /* Are we already loading the key from disk? If not create a job */
    if (o->storage == REDIS_VM_SWAPPED) {
        iojob *j;

        o->storage = REDIS_VM_LOADING;
        j = zmalloc(sizeof(*j));
        j->type = REDIS_IOJOB_LOAD;
        j->db = c->db;
        j->key = o;
        j->key->vtype = o->vtype;
        j->page = o->vm.page;
        j->val = NULL;
        j->canceled = 0;
        j->thread = (pthread_t) -1;
        lockThreadedIO();
        queueIOJob(j);
        unlockThreadedIO();
    }
    return 1;
}

插入工作线程的job在运行完后,会调用 vmThreadedIOCompletedJob,在该函数中会调用handleClientsBlockedOnSwappedKey处理阻塞的 client,而handleClientsBlockedOnSwappedKey所做的主要工作就是将所有val已加载的client放到 server.io_ready_clients中,此时client已ready好了,但还没有加入read事件循环(因为之前因为等待val已删除其 read事件)。

static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
    struct dictEntry *de;
    list *l;
    listNode *ln;
    int len;

    de = dictFind(db->io_keys,key);
    if (!de) return;

    l = dictGetEntryVal(de);
    len = listLength(l);
    /* Note: we can't use something like while(listLength(l)) as the list
     * can be freed by the calling function when we remove the last element. */
    while (len--) {
        ln = listFirst(l);
        redisClient *c = ln->value;

        if (dontWaitForSwappedKey(c,key)) {
            /* Put the client in the list of clients ready to go as we
             * loaded all the keys about it. */
            listAddNodeTail(server.io_ready_clients,c);
        }
    }
}

最后还剩下一个问题,那就是处于server.io_ready_clients的clint会在什么时候增加read事件,从而继续让其接收客户端的输入了。这个工作在beforeSleep函数中完成(前面的事件循环中有详细介绍)。beforeSleep会为server.io_ready_clients中的client增加read事件,调用processInputBuffer处理其输入。

另外注意,如果client需要的val在检查时都在内存中,但当执行命令处理函数时,该val被swap出去了,则只能使用vmLoadObject直接加载了(阻塞方式)。对此种情况,redis的解释是In practical terms this should onlyhappen with SORT BY command or if there is a bug in this function(参考blockClientOnSwappedKeys前的注释)。

此条目发表在 redis 分类目录。将固定链接加入收藏夹。

redis源代码分析15–val加载机制》有 1 条评论

  1. Pingback 引用通告: redis源代码分析15–val加载机制 | Linux C++ 中文网

发表评论

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>