VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。
这一节主要介绍阻塞方式。
redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 value(vmSwapOneObjectBlocking函数)。除此之外,redis只在serverCron函数(该函数事件处理章节分析过)中换出value。我们来看看serverCron中的处理代码,阻塞方式使用函数vmSwapOneObjectBlocking换出value,多线程方式使用函数vmSwapOneObjectThreaded换出value。
static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
---
/* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */
if (vmCanSwapOut()) {
while (server.vm_enabled && zmalloc_used_memory() >
server.vm_max_memory)
{
---
if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
retval = (server.vm_max_threads == 0) ?
vmSwapOneObjectBlocking() :
vmSwapOneObjectThreaded();
---
}
}
---
return 100;
}
无论是阻塞方式vmSwapOneObjectBlocking换出value,还是多线程方式vmSwapOneObjectThreaded换出value,最终都调用vmSwapOneObject(调用参数不一样)来换出value。
vmSwapOneObject会对每个db,随机选择5项,计算它的swappability,然后如果是多线程方式,则调用vmSwapObjectThreaded来换出value,否则使用vmSwapObjectBlocking换出value。
static int vmSwapOneObject(int usethreads) {
int j, i;
struct dictEntry *best = NULL;
double best_swappability = 0;
redisDb *best_db = NULL;
robj *key, *val;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
/* Why maxtries is set to 100?
* Because this way (usually) we'll find 1 object even if just 1% - 2%
* are swappable objects */
int maxtries = 100;
if (dictSize(db->dict) == 0) continue;
for (i = 0; i < 5; i++) {
dictEntry *de;
double swappability;
if (maxtries) maxtries--;
de = dictGetRandomKey(db->dict);
key = dictGetEntryKey(de);
val = dictGetEntryVal(de);
/* Only swap objects that are currently in memory.
*
* Also don't swap shared objects if threaded VM is on, as we
* try to ensure that the main thread does not touch the
* object while the I/O thread is using it, but we can't
* control other keys without adding additional mutex. */
if (key->storage != REDIS_VM_MEMORY ||
(server.vm_max_threads != 0 && val->refcount != 1)) {
if (maxtries) i--; /* don't count this try */
continue;
}
val->vm.atime = key->vm.atime; /* atime is updated on key object */
swappability = computeObjectSwappability(val);
if (!best || swappability > best_swappability) {
best = de;
best_swappability = swappability;
best_db = db;
}
}
}
if (best == NULL) return REDIS_ERR;
key = dictGetEntryKey(best);
val = dictGetEntryVal(best);
redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f",
key->ptr, best_swappability);
/* Unshare the key if needed */
if (key->refcount > 1) {
robj *newkey = dupStringObject(key);
decrRefCount(key);
key = dictGetEntryKey(best) = newkey;
}
/* Swap it */
if (usethreads) {
vmSwapObjectThreaded(key,val,best_db);
return REDIS_OK;
} else {
if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
dictGetEntryVal(best) = NULL;
return REDIS_OK;
} else {
return REDIS_ERR;
}
}
}
vmSwapObjectBlocking会在计算所需的交换页后,阻塞性的将value写到vm文件中(函数vmWriteObjectOnSwap),最后标记相应vm页为已使用。
static int vmSwapObjectBlocking(robj *key, robj *val) {
off_t pages = rdbSavedObjectPages(val,NULL);
off_t page;
assert(key->storage == REDIS_VM_MEMORY);
assert(key->refcount == 1);
if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR;
key->vm.page = page;
key->vm.usedpages = pages;
key->storage = REDIS_VM_SWAPPED;
key->vtype = val->type;
decrRefCount(val); /* Deallocate the object from memory. */
vmMarkPagesUsed(page,pages);
redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
(unsigned char*) key->ptr,
(unsigned long long) page, (unsigned long long) pages);
server.vm_stats_swapped_objects++;
server.vm_stats_swapouts++;
return REDIS_OK;
}
对于value的加载,如果是多线程方式,会使用blockClientOnSwappedKeys提前加载,但阻塞方式则只有到相应命令执行时才会加载。最终无论是阻塞方式还是多线程方式,都会调用lookupKey来查找key是否在内存中,若不在,则使用vmLoadObject加载value,该函数是阻塞式的读入value。
static robj *lookupKey(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->dict,key);
if (de) {
robj *key = dictGetEntryKey(de);
robj *val = dictGetEntryVal(de);
if (server.vm_enabled) {
if (key->storage == REDIS_VM_MEMORY ||
key->storage == REDIS_VM_SWAPPING)
{
/* If we were swapping the object out, stop it, this key
* was requested. */
if (key->storage == REDIS_VM_SWAPPING)
vmCancelThreadedIOJob(key);
/* Update the access time of the key for the aging algorithm. */
key->vm.atime = server.unixtime;
} else {
int notify = (key->storage == REDIS_VM_LOADING);
/* Our value was swapped on disk. Bring it at home. */
redisAssert(val == NULL);
val = vmLoadObject(key);
dictGetEntryVal(de) = val;
/* Clients blocked by the VM subsystem may be waiting for
* this key... */
if (notify) handleClientsBlockedOnSwappedKey(db,key);
}
}
return val;
} else {
return NULL;
}
}