AOF重写
现在我们来分析一种情况,假设服务器执行了以下两个命令:
SET key value
DEL key
如果上述两个命令都执行成功了,AOF中必然会增加两条命令字符串,然而这对数据库根本没什么影响,如果服务器执行了大量这样的命令对,AOF是只能追加不能删除的,所以其文件体积会无限增大。考虑周全的Redis为客户提供了重写操作,用来重写AOF文件,剔除掉里面的无效命令对。
要执行AOF重写,最简单的步骤就是:遍历服务器每一个数据库中的数据,将每一个key对应的对象,都用一条命令来表达,并存储在AOF文件中。
有了这个思路,我们就去源码中看看,AOF重写的功能由rewriteAppendOnlyFile
函数实现。
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*写一序列命令去完整的rebuild这个dataset到filename,使用REWRITEAOF and BGREWRITEAOF.
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
为了最小化命令的数量,记录redis用可变的命令,比如...然而在最大AOF_REWRITE_ITEMS_PER_CMD在一条命令中最多项
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp = NULL;
char tmpfile[256];
char byte;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
注意我们必须使用不同的临时名字去比较
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty();
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
做一个初始化慢速同步当父还在发送数据,为了使下一次同步更快
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
/* Read again a few times to get more data from the parent.
* We can't read forever (the server may receive data from clients
* faster than it is able to send data to the child), so we try to read
* some more data in a loop as soon as there is a good chance more data
* will come. If it looks like we are wasting time, we abort (this
* happens after 20 ms without new data). */
在读几次从而获取更多信息从父,我们不能一直读,所以我们尝试去从loop中读数据因为这里有一个好机会去,如果这看起来是一个好机会更多的数据会来,如果看起来我们像使浪费事件,我们放弃
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
/* Ask the master to stop sending diffs. */
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
/* We read the ACK from the server using a 5 seconds timeout. Normally
* it should reply ASAP, but just in case we lose its reply, we are sure
* the child will eventually get terminated. */
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
/* Read the final diff if any. */
aofReadDiffFromParent();
/* Write the received diff to the file. */
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
/* Now we write the entire AOF buffer we received from the parent
* via the pipe during the life of this fork child.
* once a second, we'll take a break and send updated COW info to the parent */
现在我们写入了完整的aof的buffer我们从父提供pipe获取的,我们会休息下并且发送updateed cow信息给父
size_t bytes_to_write = sdslen(server.aof_child_diff);
const char *buf = server.aof_child_diff;
long long cow_updated_time = mstime();
long long key_count = dbTotalServerKeyCount();
while (bytes_to_write) {
/* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */
size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);
if (rioWrite(&aof,buf,chunk_size) == 0)
goto werr;
bytes_to_write -= chunk_size;
buf += chunk_size;
/* Update COW info */
long long now = mstime();
if (now - cow_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
cow_updated_time = now;
}
}
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp)) goto werr;
if (fsync(fileno(fp))) goto werr;
if (fclose(fp)) { fp = NULL; goto werr; }
fp = NULL;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
if (fp) fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
具体到每一个键对应的值对象重写时,可能没有想象的那么简单,因为可能该值里面存放的数据较多,如果还是在一条命令中执行的话会造成缓冲区溢出。于是,Redis提供了如下参数:
AOF后台重写
上述的重写会阻塞服务器,如果数据量大的话,服务器会一直阻塞于此,所以和RDB一样,Redis也为AOF持久化提供了后台重写的函数。
很明显,提到后台重写就需要创建一个子进程,来执行AOF重写操作,这样就可以避免主线程被阻塞,服务器长时间无法工作。
但是,子进程在执行AOF重写的时候,服务器当前还在发生数据变化,为此,Redis提供了一个AOF后台重写缓冲区,用来存放子进程在执行AOF重写过程中插入的新数据。
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) // 每个块最多10M
// 之所以规定每个块的大小是因为不知道新加入的字符串命令的个数
typedef struct aofrwblock {
unsigned long used, free; // 已使用和空闲的
char buf[AOF_RW_BUF_BLOCK_SIZE]; // 字符串命令
} aofrwblock;
这样一来,在子进程执行AOF命令的时候,服务器如果有新数据到来,其字符串命令会添加到两个缓冲区,
- 一是AOF缓冲区,保证原AOF缓冲区的的内容会定期被写入和同步到现有的AOF文件中
- 二是AOF后台重写缓冲区,可以使得AOF后台重写不会错过新数据,相当于做了双重保险,命令不会丢失。
那么子进程在整个AOF后台重写命令时,会进行如下三个操作:
- 对现有数据库中的键值对转换成字符串命令,并写入和同步到临时AOF后台重写文件中
- 完成上述步骤后,将AOF后台重写缓冲区的数据存入临时AOF后台重写文件中
- 最后,执行更名操作,覆盖原有的AOF文件,完成新旧更替
AOF后台重写操作由如下两个函数完成,这里就不列出源码了。代码太多了而且都差不多。
// 后台执行AOF重写操作
int rewriteAppendOnlyFileBackground(void);
// 执行AOF后台重写缓冲区内数据的重写和更名操作,完成整个AOF后台重写功能
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
最后,只剩下最后一个问题了,除了显示运行命令执行,Redis还在什么时候执行后台重写操作,我一路追溯到这段代码:
/* 此段代码截取自server.c文件中的serverCron函数中 */
// 如果后台没有执行rdb,aof,以及aof重写操作,而且aof文件的大于执行BGREWRITEAOF所需的最小大小
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
/* 此段代码截取自server.c文件中的serverCron函数中 */
// 接收子进程发来的信号,非阻塞
void checkChildrenDone(void) {
int statloc = 0;
pid_t pid;
if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
/* sigKillChildHandler catches the signal and calls exit(), but we
* must make sure not to flag lastbgsave_status, etc incorrectly.
* We could directly terminate the child process via SIGUSR1
* without handling it */
if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
bysignal = SIGUSR1;
exitcode = 1;
}
if (pid == -1) {
serverLog(LL_WARNING,"waitpid() returned an error: %s. "
"child_type: %s, child_pid = %d",
strerror(errno),
strChildType(server.child_type),
(int) server.child_pid);
} else if (pid == server.child_pid) {
if (server.child_type == CHILD_TYPE_RDB) {
backgroundSaveDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_AOF) {
backgroundRewriteDoneHandler(exitcode, bysignal);
} else if (server.child_type == CHILD_TYPE_MODULE) {
ModuleForkDoneHandler(exitcode, bysignal);
} else {
serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
exit(1);
}
if (!bysignal && exitcode == 0) receiveChildInfo();
resetChildState();
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long) pid);
}
}
/* start any pending forks immediately. */
replicationStartPendingFork();
}
}