这个部分我们介绍下命令处理的一般过程。
在createClient时,为client的read事件设置了readQueryFromClient函数。我们来看看怎么处理client的命令的。
readQueryFromClient使用read一次读入REDIS_IOBUF_LEN字节,并保存在client中的querybuf参数中,然后调用processInputBuffer继续处理。
static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask){
redisClient *c =(redisClient*)privdata;
--
nread = read(fd, buf, REDIS_IOBUF_LEN);
---
if(nread){
c->querybuf = sdscatlen(c->querybuf, buf, nread);
c->lastinteraction = time(NULL);
} else {
return;
}
processInputBuffer(c);
}
processInputBuffer只处理不在REDIS_BLOCKED 和REDIS_IO_WAIT状态的client,也就是已ready好的client。另外如果c->bulklen ==-1(对于一般命令,c->bulklen都为-1,对于用multibulk协议传输的命令,下一个函数有更详细的介绍),则按行解析querybuf,并将解析到的参数保存在argv中,然后调用processCommand进行下一步处理,并且如果processCommand返回非0,会继续处理client输入。
static void processInputBuffer(redisClient *c) {
again:
---
if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
if (c->bulklen == -1) {
--
if (p) {
---
if (c->argc) {
---
if (processCommand(c) && sdslen(c->querybuf)) goto again;
}
--
}
} else {
--
int qbl = sdslen(c->querybuf);
if (processCommand(c) && sdslen(c->querybuf)) goto again;
return;
}
}
processCommand这段代码对multi-bulk 协议的解析写得真不敢恭维,转来转去的,真没劲。参看代码中的解释。解析完multibulk后,如果输入的命令是quit,则表示客户端退出了,释放其连接,返回0,表示不用继续处理了。接着使用 lookupCommand查看命令在cmdTable中对应的命令项,然后又是multbulk,接着检查安全认证情况,接着检查内存使用(前面内存章节中有介绍),接着查看 pubsub_channels、pubsub_patterns长度是否为0,若不为0,则表示处于订阅模式下(后文介绍),只允许命令 subscribeCommand、unsubscribeCommand、psubscribeCommand、 punsubscribeCommand。接着如果client处于事务模式下,则在命令不是execCommand、discardCommand的情况下将命令排队(事务处理后文也有介绍)。接着看看是否需要预先加载key,最后终于来到call函数中调用命令了。
static int processCommand(redisClient *c) {
struct redisCommand *cmd;
---
// 第一个字符是 * 表示后面是multi-bulk协议格式
// 解析得到后面的data 项数
if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
if (c->multibulk <= 0) {
resetClient(c);
return 1;
} else {
decrRefCount(c->argv[c->argc-1]);
c->argc--;
return 1;
}
} else if (c->multibulk) {
// 解析时对于普通的命令: c->bulklen始终 = -1,
//前面已获得 c->multibulk值, c->bulklen一开始为-1,随后在 if (c->bulklen == -1) 中置为需要读取的字符个数,然后返回到processInputBuffer的else中处理得到输入的参数,然后再到这儿时就会进入 if (c->bulklen == -1) 的else中,将参数保存到mbargv中,这样一直到 c->multibulk为0,才解析完multibulk协议,进行下一步处理。
if (c->bulklen == -1) {
if (((char*)c->argv[0]->ptr)[0] != '$') {
addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
resetClient(c);
return 1;
} else {
int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
decrRefCount(c->argv[0]);
if (bulklen < 0 || bulklen > 1024*1024*1024) {
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c);
return 1;
}
c->argc--;
c->bulklen = bulklen+2; /* add two bytes for CR+LF */
return 1;
}
} else {
c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
c->mbargv[c->mbargc] = c->argv[0];
c->mbargc++;
c->argc--;
c->multibulk--;
if (c->multibulk == 0) {
robj **auxargv;
int auxargc;
/* Here we need to swap the multi-bulk argc/argv with the
* normal argc/argv of the client structure. */
auxargv = c->argv;
c->argv = c->mbargv;
c->mbargv = auxargv;
auxargc = c->argc;
c->argc = c->mbargc;
c->mbargc = auxargc;
/* We need to set bulklen to something different than -1
* in order for the code below to process the command without
* to try to read the last argument of a bulk command as
* a special argument. */
c->bulklen = 0;
/* continue below and process the command */
} else {
c->bulklen = -1;
return 1;
}
}
}
/* -- end of multi bulk commands processing -- */
---
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
freeClient(c);
return 0;
}
---
cmd = lookupCommand(c->argv[0]->ptr);
if (!cmd) {
addReplySds(c,
sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
(char*)c->argv[0]->ptr));
resetClient(c);
return 1;
} else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
(c->argc < -cmd->arity)) {
addReplySds(c,
sdscatprintf(sdsempty(),
"-ERR wrong number of arguments for '%s' command\r\n",
cmd->name));
resetClient(c);
return 1;
} else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
/* This is a bulk command, we have to read the last argument yet. */
int bulklen = atoi(c->argv[c->argc-1]->ptr);
decrRefCount(c->argv[c->argc-1]);
if (bulklen < 0 || bulklen > 1024*1024*1024) {
c->argc--;
addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
resetClient(c);
return 1;
}
c->argc--;
c->bulklen = bulklen+2; /* add two bytes for CR+LF */
---
if ((signed)sdslen(c->querybuf) >= c->bulklen) {
c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
c->argc++;
c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
} else {
/* Otherwise return... there is to read the last argument
* from the socket. */
return 1;
}
}
/* Let's try to encode the bulk object to save space. */
if (cmd->flags & REDIS_CMD_BULK)
c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]);
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
resetClient(c);
return 1;
}
if (server.maxmemory) freeMemoryIfNeeded();
if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
zmalloc_used_memory() > server.maxmemory)
{
addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
resetClient(c);
return 1;
}
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
resetClient(c);
return 1;
}
/* Exec the command */
if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
if (server.vm_enabled && server.vm_max_threads > 0 &&
blockClientOnSwappedKeys(c,cmd)) return 1;
call(c,cmd);
}
/* Prepare the client for the next command */
resetClient(c);
return 1;
}
call函数首先调用命令字绑定的处理函数,返回时检查是否修改数据,若有修改,则在aof启用的情况下,写aof log,并在数据改变或者强制复制的情况下向slaves复制,最后向monitors发送当前命令及参数。
/* Call() is the core of Redis execution of a command */
static void call(redisClient *c, struct redisCommand *cmd) {
long long dirty;
dirty = server.dirty;
cmd->proc(c);
dirty = server.dirty-dirty;
if (server.appendonly && dirty)
feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
if ((dirty || cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
server.stat_numcommands++;
}
最后介绍下命令表,定义如下:
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
int flags;
/* Use a function to determine which keys need to be loaded
* in the background prior to executing this command. Takes precedence
* over vm_firstkey and others, ignored when NULL */
redisVmPreloadProc *vm_preload_proc;
/* What keys should be loaded in background when calling this command? */
int vm_firstkey; /* The first argument that's a key (0 = no keys) */
int vm_lastkey; /* THe last argument that's a key */
int vm_keystep; /* The step between first and last key */
};
对于每一个命令字,都有一个name和一个处理函数,对于某些key,在启用vm的情况下,需要使用vm_preload_proc预先加载某些key。下一节我们介绍下key的预先加载。
Pingback 引用通告: redis源代码分析14–命令处理的一般过程 | Linux C++ 中文网