VM是Redis2.0新增的一个功能。在没有VM之前,redis会把db中的所有数据放在内存中。随着redis的不断运行,所使用的内存会越来越大。但同时,client对某些数据的访问频度明显会比其他数据高。redis引入VM功能来试图解决这个问题。简言之,VM使得redis会把很少访问的value保存到磁盘中。但同时,所有value的key都放在内存中,这是为了让被换出的value的查找在启用VM前后性能差不多。
VM在redis中算是redis中最复杂的模块之一,我们分三节来介绍。这一节介绍redis的主要数据结构,下一节介绍非阻塞方式,最后一节介绍多线程方式。
我们先来看看redis中的通用对象结构redisObject :
// VM启用时, 对象所处位置
#define REDIS_VM_MEMORY 0 /* The object is on memory */
#define REDIS_VM_SWAPPED 1 /* The object is on disk */
#define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
#define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
/* The VM object structure */
struct redisObjectVM {
off_t page; /* the page at witch the object is stored on disk */
off_t usedpages; /* number of pages used on disk */
time_t atime; /* Last access time */
} vm;
/* The actual Redis Object */
// 通用类型
// 对于key,需额外标志保存value的位置、类型等
typedef struct redisObject {
void *ptr;
unsigned char type;
unsigned char encoding;
unsigned char storage; /* If this object is a key, where is the value?
* REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
unsigned char vtype; /* If this object is a key, and value is swapped out,
* this is the type of the swapped out object. */
int refcount;
/* VM fields, this are only allocated if VM is active, otherwise the
* object allocation function will just allocate
* sizeof(redisObjct) minus sizeof(redisObjectVM), so using
* Redis without VM active will not have any overhead. */
struct redisObjectVM vm;
} robj;
robj 中的type保存了对象的类型,如string、list、set等。storage保存了该key对象对应的value所处的位置:内存、磁盘、正在被换出到磁盘,正在加载。vtype表示该key对象所对应的value的类型。page和usedpages保存了该key对象所对应的 value,atime是value的最后一次访问时间。因此,当robj所表示的key对象的storage类型为REDIS_VM_SWAPPED 时,就表示该key的value已不在内存中,需从VM中page的位置加载该value,vaue的类型为vtype,大小为usedpages。
创建对象的时候,根据是否启用VM机制,来分配合适大小的robj对象大小。
static robj *createObject(int type, void *ptr) {
---
else {
if (server.vm_enabled) {
pthread_mutex_unlock(&server.obj_freelist_mutex);
o = zmalloc(sizeof(*o));
} else {
o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
}
}
---
if (server.vm_enabled) {
/* Note that this code may run in the context of an I/O thread
* and accessing to server.unixtime in theory is an error
* (no locks). But in practice this is safe, and even if we read
* garbage Redis will not fail, as it's just a statistical info */
o->vm.atime = server.unixtime;
o->storage = REDIS_VM_MEMORY;
}
return o;
}
VM的所有相关结构保存在redisServer 的如下几个字段中。
/* Global server state structure */
struct redisServer {
---
/* Virtual memory state */
FILE *vm_fp;
int vm_fd;
off_t vm_next_page; /* Next probably empty page */
off_t vm_near_pages; /* Number of pages allocated sequentially */
unsigned char *vm_bitmap; /* Bitmap of free/used pages */
time_t unixtime; /* Unix time sampled every second. */
/* Virtual memory I/O threads stuff */
/* An I/O thread process an element taken from the io_jobs queue and
* put the result of the operation in the io_done list. While the
* job is being processed, it's put on io_processing queue. */
list *io_newjobs; /* List of VM I/O jobs yet to be processed */
list *io_processing; /* List of VM I/O jobs being processed */
list *io_processed; /* List of VM I/O jobs already processed */
list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
pthread_attr_t io_threads_attr; /* attributes for threads creation */
int io_active_threads; /* Number of running I/O threads */
int vm_max_threads; /* Max number of I/O threads running at the same time */
/* Our main thread is blocked on the event loop, locking for sockets ready
* to be read or written, so when a threaded I/O operation is ready to be
* processed by the main thread, the I/O thread will use a unix pipe to
* awake the main thread. The followings are the two pipe FDs. */
int io_ready_pipe_read;
int io_ready_pipe_write;
/* Virtual memory stats */
unsigned long long vm_stats_used_pages;
unsigned long long vm_stats_swapped_objects;
unsigned long long vm_stats_swapouts;
unsigned long long vm_stats_swapins;
---
};
vm_fp 和vm_fd指向磁盘上的vm文件,通过这两个指针来读写vm文件。vm_bitmap管理着vm文件中每一页的分配与释放情况(每一项为0表示该页空闲,为1表示已使用)。每一页的大小通过vm-page-size来配置,页数通过vm-pages来配置。值得一提的是,redis中的每一页最多只能放置一个对象,一个对象可以放在连续的多个页上。unixtime只是缓存时间值,这在计算value的最近使用频率时会用到。接下来的结构跟多线程方式换出/换进vlue有关。使用多线程方式时,换进/换出value被看成一个个的job,job的类型有如下几种:
/* VM threaded I/O request message */
#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
typedef struct iojob {
int type; /* Request type, REDIS_IOJOB_* */
redisDb *db;/* Redis database */
robj *key; /* This I/O request is about swapping this key */
robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
* field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
off_t page; /* Swap page where to read/write the object */
off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
int canceled; /* True if this command was canceled by blocking side of VM */
pthread_t thread; /* ID of the thread processing this entry */
} iojob;
类型为REDIS_IOJOB_LOAD的job用来加载某个value,类型为REDIS_IOJOB_DO_SWAP的job就用来换出某个 value,在换出value之前,需要创建类型为REDIS_IOJOB_PREPARE_SWAP的job来计算所需的交换页数。
无论是上述3种中的哪一种,新建的job都会使用queueIOJob放在io_newjobs队列中,而线程入口函数IOThreadEntryPoint 会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
io_ready_clients保存了可以继续运行的client链表(之前因为等待value已阻塞),后面几个结构跟多线程的保护和全局的vm统计有关。
VM的初始化在vmInit中,主要做的工作就是上面介绍的几个结构的初始化。除此之外,最重要的工作就是设置管道的read事件的处理函数vmThreadedIOCompletedJob,该函数会在管道可读时运行,跟多线程的运行密切相关。
static void vmInit(void) {
off_t totsize;
int pipefds[2];
size_t stacksize;
struct flock fl;
if (server.vm_max_threads != 0)
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
/* Try to open the old swap file, otherwise create it */
if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
server.vm_fp = fopen(server.vm_swap_file,"w+b");
}
if (server.vm_fp == NULL) {
redisLog(REDIS_WARNING,
"Can't open the swap file: %s. Exiting.",
strerror(errno));
exit(1);
}
server.vm_fd = fileno(server.vm_fp);
/* Lock the swap file for writing, this is useful in order to avoid
* another instance to use the same swap file for a config error. */
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
fl.l_start = fl.l_len = 0;
if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) {
redisLog(REDIS_WARNING,
"Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno));
exit(1);
}
/* Initialize */
server.vm_next_page = 0;
server.vm_near_pages = 0;
server.vm_stats_used_pages = 0;
server.vm_stats_swapped_objects = 0;
server.vm_stats_swapouts = 0;
server.vm_stats_swapins = 0;
totsize = server.vm_pages*server.vm_page_size;
redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
if (ftruncate(server.vm_fd,totsize) == -1) {
redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
strerror(errno));
exit(1);
} else {
redisLog(REDIS_NOTICE,"Swap file allocated with success");
}
server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
(long long) (server.vm_pages+7)/8, server.vm_pages);
memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
/* Initialize threaded I/O (used by Virtual Memory) */
server.io_newjobs = listCreate();
server.io_processing = listCreate();
server.io_processed = listCreate();
server.io_ready_clients = listCreate();
pthread_mutex_init(&server.io_mutex,NULL);
pthread_mutex_init(&server.obj_freelist_mutex,NULL);
pthread_mutex_init(&server.io_swapfile_mutex,NULL);
server.io_active_threads = 0;
if (pipe(pipefds) == -1) {
redisLog(REDIS_WARNING,"Unable to intialized VM: pipe(2): %s. Exiting."
,strerror(errno));
exit(1);
}
server.io_ready_pipe_read = pipefds[0];
server.io_ready_pipe_write = pipefds[1];
redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
/* LZF requires a lot of stack */
pthread_attr_init(&server.io_threads_attr);
pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
/* Solaris may report a stacksize of 0, let's set it to 1 otherwise 115
* multiplying it by 2 in the while loop later will not really help
*/
if (!stacksize) stacksize = 1;
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
/* Listen for events in the threaded I/O pipe */
if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
vmThreadedIOCompletedJob, NULL) == AE_ERR)
oom("creating file event");
}
Pingback 引用通告: redis源代码分析23–VM(上) | Linux C++ 中文网