redis源代码分析22–协议

redis默认使用tcp协议的6379端口,其协议是文本行格式的而不是二进制格式的,每一行都以”\r\n”结尾,非常容易理解。

参考ProtocolSpecification.html就知道,发布到redis的命令有如下几种返回格式(对于不存在的值,会返回-1,此时client library应返回合适的nil对象(比如C语言的NULL),而不是空字符串):

1)第一个字节是字符“-”,后面跟着一行出错信息(error reply)

比如lpop命令,当操作的对象不是一个链表时,会返回如下出错信息:

“-ERR Operation against a key holding the wrong kind of value\r\n”

2)第一个字节是字符“+”,后面跟着一行表示执行结果的提示信息(line reply)

比如set命令执行成功后,会返回”+OK\r\n”

3)第一个字节是字符“$”,后面先跟一行,仅有一个数字,该数字表示下一行字符的个数(若不存在,则数字为-1)(bulk reply)

比如get命令,成功时返回值的类似于“$7\r\nmyvalue”,不存在时返回的信息为“$-1\r\n”

4)第一个字节是字符“*”,后面先跟一行,仅有一个数字,该数字表示bulk reply的个数(若不存在,则数字为-1)(multi-bulk reply)

比如lrange命令,若要求返回0–2之间的值,则成功时返回值类似于”*3\r\n$6\r\nvalue1\r\n$7\r\nmyvalue\r\n$5\r\nhello\r\n”,不存在时返回的信息类似于”*-1\r\n”。

5)第一个字节是字符“:”,后面跟着一个整数值(integer reply)

比如incr命令,成功时会返回对象+1后的值。

而client发布命令的格式有如下几种,第一个字符串都必须是命令字,不同的参数之间用1个空格来分隔:
1)Inline Command::仅一行

比如 EXISTS命令,client发送的字节流类似于”EXISTS mykey\r\n”。
2)Bulk Command:类似于返回协议的bulk reply,一般有两行,第一行依次为“命令字 参数 一个数字”,该数字表示下一行字符的个数

比如SET命令,client发送的字节流类似于”SET mykey 5\r\nhello\r\n”。

3)multi-bulk Command:跟返回协议的multi-bulk reply类似。

比如上面的SET命令,用multi-bulk协议表示则为“*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$5\r\nhello\r\n”。

尽管对于某些命令该协议发送的字节流比bulk command形式要多,但它可以支持任何一种命令,支持跟多个二进制安全参数的命令(bulk command仅支持一个),也可以使得client library不修改代码就能支持redis新发布的命令(只要把不支持的命令按multi-bulk形式发布即可)。redis的官方文档中还提到,未来可能仅支持client采用multi-bulk Command格式发布命令。

另外提一下,client library可以连续发布多条命令,而不是等到redis返回前一条命令的执行结果才发布新的命令,这种机制被称作pipelining,支持redis的client library大多支持这种机制,读者可自行参考。

最后来看看redis实现时用来返回信息的相关函数。

redis 会使用addReplySds、addReplyDouble、addReplyLongLong、addReplyUlong、 addReplyBulkLen、addReplyBulk、addReplyBulkCString等来打包不同的返回信息,最终调用addReply 来发送信息。

addReply会将发送信息添加到相应redisClient的reply链表尾部,并使用 sendReplyToClient来发送。sendReplyToClient会遍历reply链表,并依次发送,其间如果可以打包 reply(server.glueoutputbuf为真),则可以使用glueReplyBuffersIfNeeded把reply链表中的值合并到一个缓冲区,然后一次性发送。

static void addReply(redisClient *c, robj *obj) {
    if (listLength(c->reply) == 0 &&
        (c->replstate == REDIS_REPL_NONE ||
         c->replstate == REDIS_REPL_ONLINE) &&
        aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
        sendReplyToClient, c) == AE_ERR) return;

    if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
        obj = dupStringObject(obj);
        obj->refcount = 0; /* getDecodedObject() will increment the refcount */
    }
    listAddNodeTail(c->reply,getDecodedObject(obj));
}

static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    redisClient *c = privdata;
    int nwritten = 0, totwritten = 0, objlen;
    robj *o;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    /* Use writev() if we have enough buffers to send */
    if (!server.glueoutputbuf &&
        listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
        !(c->flags & REDIS_MASTER))
    {
        sendReplyToClientWritev(el, fd, privdata, mask);
        return;
    }

    while(listLength(c->reply)) {
        if (server.glueoutputbuf && listLength(c->reply) > 1)
            glueReplyBuffersIfNeeded(c);

        o = listNodeValue(listFirst(c->reply));
        objlen = sdslen(o->ptr);

        if (objlen == 0) {
            listDelNode(c->reply,listFirst(c->reply));
            continue;
        }

        if (c->flags & REDIS_MASTER) {
            /* Don't reply to a master */
            nwritten = objlen - c->sentlen;
        } else {
            nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
        }
        c->sentlen += nwritten;
        totwritten += nwritten;
        /* If we fully sent the object on head go to the next one */
        if (c->sentlen == objlen) {
            listDelNode(c->reply,listFirst(c->reply));
            c->sentlen = 0;
        }
        /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
         * bytes, in a single threaded server it's a good idea to serve
         * other clients as well, even if a very large request comes from
         * super fast link that is always able to accept data (in real world
         * scenario think about 'KEYS *' against the loopback interfae) */
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
    }
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            redisLog(REDIS_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }
    if (totwritten > 0) c->lastinteraction = time(NULL);
    if (listLength(c->reply) == 0) {
        c->sentlen = 0;
        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
    }
}

关于client library的实现,可按照前面介绍的格式自己实现,也可以阅读现有的client library来加深理解。

此条目发表在 redis 分类目录。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>