这一节介绍下redis中的多线程机制。
先看看多线程换出的机制。
serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。
static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
iojob *j;
assert(key->storage == REDIS_VM_MEMORY);
assert(key->refcount == 1);
j = zmalloc(sizeof(*j));
j->type = REDIS_IOJOB_PREPARE_SWAP;
j->db = db;
j->key = key;
j->val = val;
incrRefCount(val);
j->canceled = 0;
j->thread = (pthread_t) -1;
key->storage = REDIS_VM_SWAPPING;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
return REDIS_OK;
}
vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。
/* This function must be called while with threaded IO locked */
static void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
(void*)j, j->type, (char*)j->key->ptr);
listAddNodeTail(server.io_newjobs,j);
if (server.io_active_threads < server.vm_max_threads)
spawnIOThread();
}
从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。
static void spawnIOThread(void) {
pthread_t thread;
sigset_t mask, omask;
int err;
sigemptyset(&mask);
sigaddset(&mask,SIGCHLD);
sigaddset(&mask,SIGHUP);
sigaddset(&mask,SIGPIPE);
pthread_sigmask(SIG_SETMASK, &mask, &omask);
while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
strerror(err));
usleep(1000000);
}
pthread_sigmask(SIG_SETMASK, &omask, NULL);
server.io_active_threads++;
}
IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
static void *IOThreadEntryPoint(void *arg) {
iojob *j;
listNode *ln;
REDIS_NOTUSED(arg);
pthread_detach(pthread_self());
while(1) {
/* Get a new job to process */
lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
/* No new jobs in queue, exit. */
redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
(long) pthread_self());
server.io_active_threads--;
unlockThreadedIO();
return NULL;
}
ln = listFirst(server.io_newjobs);
j = ln->value;
listDelNode(server.io_newjobs,ln);
/* Add the job in the processing queue */
j->thread = pthread_self();
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
(long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
/* Process the Job */
if (j->type == REDIS_IOJOB_LOAD) {
j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
FILE *fp = fopen("/dev/null","w+");
j->pages = rdbSavedObjectPages(j->val,fp);
fclose(fp);
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
j->canceled = 1;
}
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
lockThreadedIO();
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
unlockThreadedIO();
/* Signal the main thread there is new stuff to process */
assert(write(server.io_ready_pipe_write,"x",1) == 1);
}
return NULL; /* never reached */
}
static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
char buf[1];
int retval, processed = 0, toprocess = -1, trytoswap = 1;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
if (privdata != NULL) trytoswap = 0; /* check the comments above... */
/* For every byte we read in the read side of the pipe, there is one
* I/O job completed to process. */
while((retval = read(fd,buf,1)) == 1) {
iojob *j;
listNode *ln;
robj *key;
struct dictEntry *de;
redisLog(REDIS_DEBUG,"Processing I/O completed job");
/* Get the processed element (the oldest one) */
lockThreadedIO();
assert(listLength(server.io_processed) != 0);
if (toprocess == -1) {
toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
if (toprocess <= 0) toprocess = 1;
}
ln = listFirst(server.io_processed);
j = ln->value;
listDelNode(server.io_processed,ln);
unlockThreadedIO();
/* If this job is marked as canceled, just ignore it */
if (j->canceled) {
freeIOJob(j);
continue;
}
/* Post process it in the main thread, as there are things we
* can do just here to avoid race conditions and/or invasive locks */
redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
de = dictFind(j->db->dict,j->key);
assert(de != NULL);
key = dictGetEntryKey(de);
if (j->type == REDIS_IOJOB_LOAD) {
redisDb *db;
/* Key loaded, bring it at home */
key->storage = REDIS_VM_MEMORY;
key->vm.atime = server.unixtime;
vmMarkPagesFree(key->vm.page,key->vm.usedpages);
redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
(unsigned char*) key->ptr);
server.vm_stats_swapped_objects--;
server.vm_stats_swapins++;
dictGetEntryVal(de) = j->val;
incrRefCount(j->val);
db = j->db;
freeIOJob(j);
/* Handle clients waiting for this key to be loaded. */
handleClientsBlockedOnSwappedKey(db,key);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
/* Now we know the amount of pages required to swap this object.
* Let's find some space for it, and queue this task again
* rebranded as REDIS_IOJOB_DO_SWAP. */
if (!vmCanSwapOut() ||
vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
{
/* Ooops... no space or we can't swap as there is
* a fork()ed Redis trying to save stuff on disk. */
freeIOJob(j);
key->storage = REDIS_VM_MEMORY; /* undo operation */
} else {
/* Note that we need to mark this pages as used now,
* if the job will be canceled, we'll mark them as freed
* again. */
vmMarkPagesUsed(j->page,j->pages);
j->type = REDIS_IOJOB_DO_SWAP;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
robj *val;
/* Key swapped. We can finally free some memory. */
if (key->storage != REDIS_VM_SWAPPING) {
printf("key->storage: %d\n",key->storage);
printf("key->name: %s\n",(char*)key->ptr);
printf("key->refcount: %d\n",key->refcount);
printf("val: %p\n",(void*)j->val);
printf("val->type: %d\n",j->val->type);
printf("val->ptr: %s\n",(char*)j->val->ptr);
}
redisAssert(key->storage == REDIS_VM_SWAPPING);
val = dictGetEntryVal(de);
key->vm.page = j->page;
key->vm.usedpages = j->pages;
key->storage = REDIS_VM_SWAPPED;
key->vtype = j->val->type;
decrRefCount(val); /* Deallocate the object from memory. */
dictGetEntryVal(de) = NULL;
redisLog(REDIS_DEBUG,
"VM: object %s swapped out at %lld (%lld pages) (threaded)",
(unsigned char*) key->ptr,
(unsigned long long) j->page, (unsigned long long) j->pages);
server.vm_stats_swapped_objects++;
server.vm_stats_swapouts++;
freeIOJob(j);
/* Put a few more swap requests in queue if we are still
* out of memory */
if (trytoswap && vmCanSwapOut() &&
zmalloc_used_memory() > server.vm_max_memory)
{
int more = 1;
while(more) {
lockThreadedIO();
more = listLength(server.io_newjobs) <
(unsigned) server.vm_max_threads;
unlockThreadedIO();
/* Don't waste CPU time if swappable objects are rare. */
if (vmSwapOneObjectThreaded() == REDIS_ERR) {
trytoswap = 0;
break;
}
}
}
}
processed++;
if (processed == toprocess) return;
}
if (retval < 0 && errno != EAGAIN) {
redisLog(REDIS_WARNING,
"WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
strerror(errno));
}
}