redis源代码分析14–命令处理的一般过程

这个部分我们介绍下命令处理的一般过程。
在createClient时,为client的read事件设置了readQueryFromClient函数。我们来看看怎么处理client的命令的。
readQueryFromClient使用read一次读入REDIS_IOBUF_LEN字节,并保存在client中的querybuf参数中,然后调用processInputBuffer继续处理。

static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask){
   redisClient *c =(redisClient*)privdata;
   --
   nread = read(fd, buf, REDIS_IOBUF_LEN);
   ---
   if(nread){
       c->querybuf = sdscatlen(c->querybuf, buf, nread);
       c->lastinteraction = time(NULL);
   } else {
       return;
   }
   processInputBuffer(c);
}

processInputBuffer只处理不在REDIS_BLOCKED 和REDIS_IO_WAIT状态的client,也就是已ready好的client。另外如果c->bulklen ==-1(对于一般命令,c->bulklen都为-1,对于用multibulk协议传输的命令,下一个函数有更详细的介绍),则按行解析querybuf,并将解析到的参数保存在argv中,然后调用processCommand进行下一步处理,并且如果processCommand返回非0,会继续处理client输入。

static void processInputBuffer(redisClient *c) {
again:
    ---
    if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
    if (c->bulklen == -1) {
       --
        if (p) {
            ---
            if (c->argc) {
               ---
                if (processCommand(c) && sdslen(c->querybuf)) goto again;
            }
            --
        }
    } else {
        --
        int qbl = sdslen(c->querybuf);
        if (processCommand(c) && sdslen(c->querybuf)) goto again;
            return;
    }
}

processCommand这段代码对multi-bulk 协议的解析写得真不敢恭维,转来转去的,真没劲。参看代码中的解释。解析完multibulk后,如果输入的命令是quit,则表示客户端退出了,释放其连接,返回0,表示不用继续处理了。接着使用 lookupCommand查看命令在cmdTable中对应的命令项,然后又是multbulk,接着检查安全认证情况,接着检查内存使用(前面内存章节中有介绍),接着查看 pubsub_channels、pubsub_patterns长度是否为0,若不为0,则表示处于订阅模式下(后文介绍),只允许命令 subscribeCommand、unsubscribeCommand、psubscribeCommand、 punsubscribeCommand。接着如果client处于事务模式下,则在命令不是execCommand、discardCommand的情况下将命令排队(事务处理后文也有介绍)。接着看看是否需要预先加载key,最后终于来到call函数中调用命令了。

static int processCommand(redisClient *c) {

    struct redisCommand *cmd;

     ---

     // 第一个字符是 * 表示后面是multi-bulk协议格式

    // 解析得到后面的data 项数
    if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
        c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
        if (c->multibulk <= 0) {
            resetClient(c);
            return 1;
        } else {
            decrRefCount(c->argv[c->argc-1]);
            c->argc--;
            return 1;
        }
    } else if (c->multibulk) {
        // 解析时对于普通的命令: c->bulklen始终 = -1,
        //前面已获得 c->multibulk值, c->bulklen一开始为-1,随后在 if (c->bulklen == -1) 中置为需要读取的字符个数,然后返回到processInputBuffer的else中处理得到输入的参数,然后再到这儿时就会进入 if (c->bulklen == -1) 的else中,将参数保存到mbargv中,这样一直到 c->multibulk为0,才解析完multibulk协议,进行下一步处理。
        if (c->bulklen == -1) {
            if (((char*)c->argv[0]->ptr)[0] != '$') {
                addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
                resetClient(c);
                return 1;
            } else {
                int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
                decrRefCount(c->argv[0]);
                if (bulklen < 0 || bulklen > 1024*1024*1024) {
                    c->argc--;
                    addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
                    resetClient(c);
                    return 1;
                }
                c->argc--;
                c->bulklen = bulklen+2; /* add two bytes for CR+LF */
                return 1;
            }
        } else {
            c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
            c->mbargv[c->mbargc] = c->argv[0];
            c->mbargc++;
            c->argc--;
            c->multibulk--;
            if (c->multibulk == 0) {
                robj **auxargv;
                int auxargc;

                /* Here we need to swap the multi-bulk argc/argv with the
                 * normal argc/argv of the client structure. */
                auxargv = c->argv;
                c->argv = c->mbargv;
                c->mbargv = auxargv;

                auxargc = c->argc;
                c->argc = c->mbargc;
                c->mbargc = auxargc;

                /* We need to set bulklen to something different than -1
                 * in order for the code below to process the command without
                 * to try to read the last argument of a bulk command as
                 * a special argument. */
                c->bulklen = 0;
                /* continue below and process the command */
            } else {
                c->bulklen = -1;
                return 1;
            }
        }
    }
    /* -- end of multi bulk commands processing -- */

    ---
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        freeClient(c);
        return 0;
    }
    ---
    cmd = lookupCommand(c->argv[0]->ptr);
    if (!cmd) {
        addReplySds(c,
            sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
                (char*)c->argv[0]->ptr));
        resetClient(c);
        return 1;
    } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
               (c->argc < -cmd->arity)) {
        addReplySds(c,
            sdscatprintf(sdsempty(),
                "-ERR wrong number of arguments for '%s' command\r\n",
                cmd->name));
        resetClient(c);
        return 1;
    } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
        /* This is a bulk command, we have to read the last argument yet. */
        int bulklen = atoi(c->argv[c->argc-1]->ptr);

        decrRefCount(c->argv[c->argc-1]);
        if (bulklen < 0 || bulklen > 1024*1024*1024) {
            c->argc--;
            addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
            resetClient(c);
            return 1;
        }
        c->argc--;
        c->bulklen = bulklen+2; /* add two bytes for CR+LF */
       ---
        if ((signed)sdslen(c->querybuf) >= c->bulklen) {
            c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
            c->argc++;
            c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
        } else {
            /* Otherwise return... there is to read the last argument
             * from the socket. */
            return 1;
        }
    }
    /* Let's try to encode the bulk object to save space. */
    if (cmd->flags & REDIS_CMD_BULK)
        c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]);

    /* Check if the user is authenticated */
    if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
        addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
        resetClient(c);
        return 1;

    }
    if (server.maxmemory) freeMemoryIfNeeded();
    if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
        zmalloc_used_memory() > server.maxmemory)
    {
        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
        resetClient(c);
        return 1;
    }

    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
        &&
        cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
        cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
        addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
        resetClient(c);
        return 1;
    }

    /* Exec the command */
    if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
        queueMultiCommand(c,cmd);
        addReply(c,shared.queued);
    } else {
        if (server.vm_enabled && server.vm_max_threads > 0 &&
            blockClientOnSwappedKeys(c,cmd)) return 1;
        call(c,cmd);
    }

    /* Prepare the client for the next command */
    resetClient(c);
    return 1;

}

call函数首先调用命令字绑定的处理函数,返回时检查是否修改数据,若有修改,则在aof启用的情况下,写aof log,并在数据改变或者强制复制的情况下向slaves复制,最后向monitors发送当前命令及参数。

/* Call() is the core of Redis execution of a command */
static void call(redisClient *c, struct redisCommand *cmd) {
    long long dirty;

    dirty = server.dirty;
    cmd->proc(c);
    dirty = server.dirty-dirty;

    if (server.appendonly && dirty)
        feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
    if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
        listLength(server.slaves))
        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
    if (listLength(server.monitors))
        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
    server.stat_numcommands++;
}

最后介绍下命令表,定义如下:

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    int flags;
    /* Use a function to determine which keys need to be loaded
     * in the background prior to executing this command. Takes precedence
     * over vm_firstkey and others, ignored when NULL */
    redisVmPreloadProc *vm_preload_proc;
    /* What keys should be loaded in background when calling this command? */
    int vm_firstkey; /* The first argument that's a key (0 = no keys) */
    int vm_lastkey;  /* THe last argument that's a key */
    int vm_keystep;  /* The step between first and last key */
};

对于每一个命令字,都有一个name和一个处理函数,对于某些key,在启用vm的情况下,需要使用vm_preload_proc预先加载某些key。下一节我们介绍下key的预先加载。

此条目发表在 redis 分类目录。将固定链接加入收藏夹。

redis源代码分析14–命令处理的一般过程》有 1 条评论

  1. Pingback 引用通告: redis源代码分析14–命令处理的一般过程 | Linux C++ 中文网

发表评论

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>