redis源代码分析11–client连接(上)

接下来的三节我们介绍client连接,按接受client连接、client连接的3个核心函数、client连接的几个标志3个部分顺序介绍。

这一节我们介绍下redis如何接受client连接。

在main函数中调用的initServer中,可看到如下代码:

static void initServer() {
  ---
   server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
  ---
   if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
        acceptHandler, NULL) == AE_ERR) oom("creating file event");
  ---
}


anetTcpServer 也就是socket、bind、listen的封装,返回listen后的fd,并保存在全局的server.fd中,然后调用 aeCreateFileEvent,使server.fd监听read事件,并在read事件响应后(也就是有新的client来到时),调用 acceptHandler来处理。可以看到acceptHandler最终调用系统api accept来处理。在acceptHandler返回后(尽管acceptHandler仅等待一个client连接,但由于server.fd的read事件一直被监听,所以会在aeProcessEvents反复被处理,从而导致acceptHandler反复被调用),acceptHandler调用createClient返回一个redisClient结构,保存新的client连接的状态。如果客户端连接数过多,则向客户端返回出错信息,并释放连接。从这里可以看出,redis对client连接的处理完全是使用事件来处理的,没有多进程,没有多线程。

static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     ---
    cfd = anetAccept(server.neterr, fd, cip, &cport);
    ---
    if ((c = createClient(cfd)) == NULL) {
        redisLog(REDIS_WARNING,"Error allocating resoures for the client");
        close(cfd); /* May be already closed, just ingore errors */
        return;
    }
    /* If maxclient directive is set and this is one client more... close the
     * connection. Note that we create the client instead to check before
     * for this condition, since now the socket is already set in nonblocking
     * mode and we can send an error for free using the Kernel I/O */
    if (server.maxclients && listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        freeClient(c);
        return;

    }

    server.stat_numconnections++;
}

int anetAccept(char *err, int serversock, char *ip, int *port)
{
    int fd;
    struct sockaddr_in sa;
    unsigned int saLen;

    while(1) {
        saLen = sizeof(sa);
        fd = accept(serversock, (struct sockaddr*)&sa, &saLen);
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s\n", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
    if (port) *port = ntohs(sa.sin_port);
    return fd;
}
此条目发表在 redis 分类目录。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已被标记为 *

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>