RDB命令
RDB有两种命令,一种是SAVE,另一种是BGSAVE。我们一起来看看他们的实现源码。
SAVE命令
按照Redis的命令定义,可以知道SAVE命令的底层代码实现是由saveCommand实现,于是去源码中找到了它。
#define CHILD_TYPE_RDB 1
void saveCommand(client *c) {
if (server.child_type == CHILD_TYPE_RDB) { //如果有子进程在rdb
addReplyError(c,"Background save already in progress");
return;
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); //填充一些saveinfo的基本信息
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) { //保存的主函数
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.err);
}
}
/* This structure can be optionally passed to RDB save/load functions in
* order to implement additional functionalities, by storing and loading
* metadata to the RDB file.
翻译:
此结构可以选择性的传递给RDB保存或者加载,可以通过存储和加载元数据到RDB文件来实现其他功能
* For example use to select a DB at load time, useful in
* replication in order to make sure that chained slaves (slaves of slaves)
* select the correct DB and are able to accept the stream coming from the
* top-level master. */
比如用它去选择一个DB,在确保链式从机选择正确的DB并且能接受最高级别的主机的stream
typedef struct rdbSaveInfo {
/* Used saving and loading. */
int repl_stream_db; /* DB to select in server.master client. */
/* Used only loading. */
int repl_id_is_set; /* True if repl_id field is set. */
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
long long repl_offset; /* Replication offset. */
} rdbSaveInfo;
rdbPopulateSaveInfo
#define RDB_SAVE_INFO_INIT {-1,0,"0000000000000000000000000000000000000000",-1}
/* Populate the rdbSaveInfo structure used to persist the replication
* information inside the RDB file. Currently the structure explicitly
* contains just the currently selected DB from the master stream, however
* if the rdbSave*() family functions receive a NULL rsi structure also
* the Replication ID/offset is not saved. The function populates 'rsi'
* that is normally stack-allocated in the caller, returns the populated
* pointer if the instance has a valid master client, otherwise NULL
* is returned, and the RDB saving will not persist any replication related
* information. */
填充rdbSaveInfo数据结构,用于持久化rdb文件中的复制信息。目前这个结构显示包含了当前从主机中选择的数据库,然而当rdbSave*()的方法收到一个空的rsi结构,id和offset的复制信息也没有保存。这个填充rsi的方法是calle,返回一个填充后的指针如果实力有一个有效的主,否则就是返回空。并且rdb不糊持久化任何相关的信息。
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
*rsi = rsi_init;
/* If the instance is a master, we can populate the replication info
* only when repl_backlog is not NULL. If the repl_backlog is NULL,
* it means that the instance isn't in any replication chains. In this
* scenario the replication info is useless, because when a slave
* connects to us, the NULL repl_backlog will trigger a full
* synchronization, at the same time we will use a new replid and clear
* replid2. */
如果当前实例是主机,我们能填充复制信息当repl_backlog非空的时候。如果repl_backlog是空的,它表示实例不在任何的复制链上,在这种场景复制信息时没有用的,因为当一个从机连我们的时候,这个空的repl_backlog将会触发一次全量复制,同事我们会用一个新的replid并且清理replid2。
if (!server.masterhost && server.repl_backlog) {
/* Note that when server.slaveseldb is -1, it means that this master
* didn't apply any write commands after a full synchronization.
* So we can let repl_stream_db be 0, this allows a restarted slave
* to reload replication ID/offset, it's safe because the next write
* command must generate a SELECT statement. */
注意当服务器的server.slaveseldb为-1的时候,这意味着主机没有应用热河写入命令在全量同步后,所以我们可以让repl_stream_db变为0,这会允许重启一个从服务器去重在家复制的id,这个是安全的因为下一次写入肯定会生成一个select语句
rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb;
return rsi;
}
/* If the instance is a slave we need a connected master
* in order to fetch the currently selected DB. */
if (server.master) {
rsi->repl_stream_db = server.master->db->id;
return rsi;
}
/* If we have a cached master we can use it in order to populate the
* replication selected DB info inside the RDB file: the slave can
* increment the master_repl_offset only from data arriving from the
* master, so if we are disconnected the offset in the cached master
* is valid. */
如果我们有一个缓存主机,我们能用它去填充复制选择db的信息在RDBfile中,这个slave能增加master_repl_offset当信息从master到达,因此,如果我们断开连接,缓存的主机中的偏移量是有效的
if (server.cached_master) {
rsi->repl_stream_db = server.cached_master->db->id;
return rsi;
}
return NULL;
}
rdbSave(server.rdb_filename,rsiptr)
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp = NULL;
rio rdb;
int error = 0;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}
// 初始化I/0,便于后续写入文件
rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) { //主函数
errno = error;
goto werr;
}
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
if (fclose(fp)) { fp = NULL; goto werr; }
fp = NULL;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
//当生成DBfile的时候,重命名以确保DB文件原子性修改
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (fp) fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
startSaving和stopSaving
void startSaving(int rdbflags) {
/* Fire the persistence modules end event. */
int subevent;
if (rdbflags & RDBFLAGS_AOF_PREAMBLE)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_AOF_START;
else if (getpid()!=server.pid)
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_RDB_START;
else
subevent = REDISMODULE_SUBEVENT_PERSISTENCE_SYNC_RDB_START;
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,subevent,NULL);
}
void stopSaving(int success) {
/* Fire the persistence modules end event. */
moduleFireServerEvent(REDISMODULE_EVENT_PERSISTENCE,
success?
REDISMODULE_SUBEVENT_PERSISTENCE_ENDED:
REDISMODULE_SUBEVENT_PERSISTENCE_FAILED,
NULL);
}
直接到保存的代码
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
* missing because of I/O errors.
*
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
uint64_t cksum;
size_t processed = 0;
int j;
long key_count = 0;
long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB"; //可以看出aof的rewrite和rdb都是用这个函数
//设置校验和
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum; //设置校验和的函数
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
//写入此时系统相关信息
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
// 遍历所有数据库
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */
// 存入数据库编号
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
size_t rdb_bytes_before_key = rdb->processed_bytes;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire,j) == -1) goto werr;
/* In fork child process, we can try to release memory back to the
* OS and possibly avoid or decrease COW. We give the dismiss
* mechanism a hint about an estimated size of the object we stored. */
在子进程中我们可以尝试去释放内存有可能降低cow,我们给这个释放的机制一个估计我们存储对象大小的一个估计值
size_t dump_size = rdb->processed_bytes - rdb_bytes_before_key;
if (server.in_fork_child) dismissObject(o, dump_size);
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
当RDB产生当作rewrite的一部分,移动积累不同从父到子当在rewriting为了有更小的的最终写入
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);
info_updated_time = now;
}
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* If we are storing the replication information on disk, persist
* the script cache as well: on successful PSYNC after a restart, we need
* to be able to process any EVALSHA inside the replication backlog the
* master will send us. */
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
后面加的东西太多了,需要好好分模块阅读下。
BGSAVE命令
BGSAVE命令是开一个进程,然后存储RDB文件在该进程中执行,属于后台存储。该存储方式需要注意一下几种情况。
- 如果后台正在进行RDB存储,则返回错误
- 如果后台正在进行AOF存储,则将rdb_bgsave_scheduled参数置1,等到系统函数
serverCron
定期执行的时候,检查参数,并执行BGSAVE命令
其源码如下:
/* BGSAVE [SCHEDULE] */
void bgsaveCommand(client *c) {
int schedule = 0;
/* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
* is in progress. Instead of returning an error a BGSAVE gets scheduled. */
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
schedule = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (server.child_type == CHILD_TYPE_RDB) {
addReplyError(c,"Background save already in progress");
} else if (hasActiveChildProcess()) {
if (schedule) {
server.rdb_bgsave_scheduled = 1;
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
"Another child process is active (AOF?): can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReplyErrorObject(c,shared.err);
}
}
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
if (hasActiveChildProcess()) return C_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
int retval;
/* Child */
redisSetProcTitle("redis-rdb-bgsave");
redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %ld",(long) childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
return C_OK;
}
return C_OK; /* unreached */
}
自动保存
在Redis.conf文件中,可以配置服务器定期执行SAVE命令,该参数如下:
save 900 1
save 300 10
save 60 10000
struct saveparam {
time_t seconds;
int changes;
};
另外,在redisServer结构体中有如下参数,用来记录上述要求。
struct redisServer {
// ...
struct saveparam * saveparam ;
// ...
long long dirty; // 服务器的脏数据,表示没有进行持久化的数据
time_t lastsave; // 上一次执行保存的时间
}
serverCron函数中的代码片段
if (hasActiveChildProcess() || ldbPendingChildren())
else{
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
还有一个CONFIG_BGSAVE_RETRY_DELAY和server.lastbgsave_status这两个参数的限制。