From 249ad25f4f495057d11672f5fbdcb763860b72e1 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 5 Jan 2011 18:38:31 +0100 Subject: [PATCH] BGSAVE work in progress --- src/diskstore.c | 47 ++++++++++++++++++++++++++++++++++++++++++++ src/dscache.c | 22 +++++++++++++++++---- src/rdb.c | 19 ++++++++++++------ src/redis.h | 1 + utils/whatisdoing.sh | 18 +++++++++++++++++ 5 files changed, 97 insertions(+), 10 deletions(-) create mode 100755 utils/whatisdoing.sh diff --git a/src/diskstore.c b/src/diskstore.c index 7250390eb..26f3af607 100644 --- a/src/diskstore.c +++ b/src/diskstore.c @@ -348,3 +348,50 @@ void dsFlushDb(int dbid) { } } } + +int dsRdbSave(char *filename) { + char tmpfile[256]; + int j, i; + time_t now = time(NULL); + + snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); + fp = fopen(tmpfile,"w"); + if (!fp) { + redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno)); + return REDIS_ERR; + } + if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; + + /* Scan all diskstore dirs looking for keys */ + for (j = 0; j < 256; j++) { + for (i = 0; i < 256; i++) { + snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i); + + /* Write the SELECT DB opcode */ + if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; + if (rdbSaveLen(fp,j) == -1) goto werr; + } + } + + /* Make sure data will not remain on the OS's output buffers */ + fflush(fp); + fsync(fileno(fp)); + fclose(fp); + + /* Use RENAME to make sure the DB file is changed atomically only + * if the generate DB file is ok. */ + if (rename(tmpfile,filename) == -1) { + redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno)); + unlink(tmpfile); + return REDIS_ERR; + } + redisLog(REDIS_NOTICE,"DB saved on disk"); + server.dirty = 0; + server.lastsave = time(NULL); + return REDIS_OK; + +werr: + fclose(fp); + unlink(tmpfile); + redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); +} diff --git a/src/dscache.c b/src/dscache.c index 4ebca7087..1adba6f56 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -512,7 +512,7 @@ int processActiveIOJobs(int max) { #if 0 /* If there are new jobs we need to signal the thread to - * process the next one. */ + * process the next one. FIXME: drop this if useless. */ redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d", listLength(server.io_newjobs), listLength(server.io_processing)); @@ -576,7 +576,21 @@ void queueIOJob(iojob *j) { spawnIOThread(); } -void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { +/* Consume all the IO scheduled operations, and all the thread IO jobs + * so that eventually the state of diskstore is a point-in-time snapshot. + * + * This is useful when we need to BGSAVE with diskstore enabled. */ +void cacheForcePointInTime(void) { + redisLog(REDIS_NOTICE,"Diskstore: synching on disk to reach point-in-time state."); + while (listLength(server.cache_io_queue) != 0) { + cacheScheduleIOPushJobs(REDIS_IO_ASAP); + processActiveIOJobs(1); + } + waitEmptyIOJobsQueue(); + processAllPendingIOJobs(); +} + +void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { iojob *j; j = zmalloc(sizeof(*j)); @@ -762,7 +776,7 @@ int cacheScheduleIOPushJobs(int flags) { op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr); if (op->type == REDIS_IO_LOAD) { - dsCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL); + cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL); } else { /* Lookup the key, in order to put the current value in the IO * Job. Otherwise if the key does not exists we schedule a disk @@ -775,7 +789,7 @@ int cacheScheduleIOPushJobs(int flags) { * the key on disk. */ val = NULL; } - dsCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val); + cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val); } /* Mark the operation as in progress. */ cacheScheduleIODelFlag(op->db,op->key,op->type); diff --git a/src/rdb.c b/src/rdb.c index 60d0a6ce2..6b6b6ab64 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -429,8 +429,10 @@ int rdbSave(char *filename) { int j; time_t now = time(NULL); - /* FIXME: implement .rdb save for disk store properly */ - redisAssert(server.ds_enabled == 0); + if (server.ds_enabled) { + cacheForcePointInTime(); + return dsRdbSave(filename); + } snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); @@ -495,17 +497,22 @@ int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsavechildpid != -1) return REDIS_ERR; - redisAssert(server.ds_enabled == 0); + server.dirty_before_bgsave = server.dirty; + if ((childpid = fork()) == 0) { + int retval; + /* Child */ if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); - if (rdbSave(filename) == REDIS_OK) { - _exit(0); + if (server.ds_enabled) { + cacheForcePointInTime(); + dsRdbSave(filename); } else { - _exit(1); + rdbSave(filename); } + _exit((retval == REDIS_OK) ? 0 : 1); } else { /* Parent */ if (childpid == -1) { diff --git a/src/redis.h b/src/redis.h index 3486307ec..495de9859 100644 --- a/src/redis.h +++ b/src/redis.h @@ -813,6 +813,7 @@ void cacheCron(void); int cacheKeyMayExist(redisDb *db, robj *key); void cacheSetKeyMayExist(redisDb *db, robj *key); void cacheSetKeyDoesNotExist(redisDb *db, robj *key); +void cacheForcePointInTime(void); /* Set data type */ robj *setTypeCreate(robj *value); diff --git a/utils/whatisdoing.sh b/utils/whatisdoing.sh new file mode 100755 index 000000000..8f441cfc0 --- /dev/null +++ b/utils/whatisdoing.sh @@ -0,0 +1,18 @@ +# This script is from http://poormansprofiler.org/ + +#!/bin/bash +nsamples=1 +sleeptime=0 +pid=$(pidof redis-server) + +for x in $(seq 1 $nsamples) + do + gdb -ex "set pagination 0" -ex "thread apply all bt" -batch -p $pid + sleep $sleeptime + done | \ +awk ' + BEGIN { s = ""; } + /Thread/ { print s; s = ""; } + /^\#/ { if (s != "" ) { s = s "," $4} else { s = $4 } } + END { print s }' | \ +sort | uniq -c | sort -r -n -k 1,1