redis-aof-2

2021-12-02

AOF重写

现在我们来分析一种情况,假设服务器执行了以下两个命令:

SET key value
DEL key

如果上述两个命令都执行成功了,AOF中必然会增加两条命令字符串,然而这对数据库根本没什么影响,如果服务器执行了大量这样的命令对,AOF是只能追加不能删除的,所以其文件体积会无限增大。考虑周全的Redis为客户提供了重写操作,用来重写AOF文件,剔除掉里面的无效命令对。

要执行AOF重写,最简单的步骤就是:遍历服务器每一个数据库中的数据,将每一个key对应的对象,都用一条命令来表达,并存储在AOF文件中。

有了这个思路,我们就去源码中看看,AOF重写的功能由rewriteAppendOnlyFile函数实现。

/* Write a sequence of commands able to fully rebuild the dataset into
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *写一序列命令去完整的rebuild这个dataset到filename,使用REWRITEAOF and BGREWRITEAOF.
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
 * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
 * are inserted using a single command. */
 为了最小化命令的数量,记录redis用可变的命令,比如...然而在最大AOF_REWRITE_ITEMS_PER_CMD在一条命令中最多项
int rewriteAppendOnlyFile(char *filename) {
    rio aof;
    FILE *fp = NULL;
    char tmpfile[256];
    char byte;

    /* Note that we have to use a different temp name here compared to the
     * one used by rewriteAppendOnlyFileBackground() function. */
     注意我们必须使用不同的临时名字去比较
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);

    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);

    startSaving(RDBFLAGS_AOF_PREAMBLE);

    if (server.aof_use_rdb_preamble) {
        int error;
        if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }

    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
     做一个初始化慢速同步当父还在发送数据,为了使下一次同步更快
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;

    /* Read again a few times to get more data from the parent.
     * We can't read forever (the server may receive data from clients
     * faster than it is able to send data to the child), so we try to read
     * some more data in a loop as soon as there is a good chance more data
     * will come. If it looks like we are wasting time, we abort (this
     * happens after 20 ms without new data). */
     	在读几次从而获取更多信息从父,我们不能一直读,所以我们尝试去从loop中读数据因为这里有一个好机会去,如果这看起来是一个好机会更多的数据会来,如果看起来我们像使浪费事件,我们放弃
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
        {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }

    /* Ask the master to stop sending diffs. */
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 5 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));

    /* Now we write the entire AOF buffer we received from the parent
     * via the pipe during the life of this fork child.
     * once a second, we'll take a break and send updated COW info to the parent */
     现在我们写入了完整的aof的buffer我们从父提供pipe获取的,我们会休息下并且发送updateed cow信息给父
    size_t bytes_to_write = sdslen(server.aof_child_diff);
    const char *buf = server.aof_child_diff;
    long long cow_updated_time = mstime();
    long long key_count = dbTotalServerKeyCount();
    while (bytes_to_write) {
        /* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */
        size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);

        if (rioWrite(&aof,buf,chunk_size) == 0)
            goto werr;

        bytes_to_write -= chunk_size;
        buf += chunk_size;

        /* Update COW info */
        long long now = mstime();
        if (now - cow_updated_time >= 1000) {
            sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, "AOF rewrite");
            cow_updated_time = now;
        }
    }

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp)) goto werr;
    if (fsync(fileno(fp))) goto werr;
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
    stopSaving(1);
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
    if (fp) fclose(fp);
    unlink(tmpfile);
    stopSaving(0);
    return C_ERR;
}

具体到每一个键对应的值对象重写时,可能没有想象的那么简单,因为可能该值里面存放的数据较多,如果还是在一条命令中执行的话会造成缓冲区溢出。于是,Redis提供了如下参数:

AOF后台重写

上述的重写会阻塞服务器,如果数据量大的话,服务器会一直阻塞于此,所以和RDB一样,Redis也为AOF持久化提供了后台重写的函数。

很明显,提到后台重写就需要创建一个子进程,来执行AOF重写操作,这样就可以避免主线程被阻塞,服务器长时间无法工作。

但是,子进程在执行AOF重写的时候,服务器当前还在发生数据变化,为此,Redis提供了一个AOF后台重写缓冲区,用来存放子进程在执行AOF重写过程中插入的新数据。

#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    // 每个块最多10M
// 之所以规定每个块的大小是因为不知道新加入的字符串命令的个数
typedef struct aofrwblock {
    unsigned long used, free;  // 已使用和空闲的
    char buf[AOF_RW_BUF_BLOCK_SIZE];  // 字符串命令
} aofrwblock;

这样一来,在子进程执行AOF命令的时候,服务器如果有新数据到来,其字符串命令会添加到两个缓冲区,

  • 一是AOF缓冲区,保证原AOF缓冲区的的内容会定期被写入和同步到现有的AOF文件中
  • 二是AOF后台重写缓冲区,可以使得AOF后台重写不会错过新数据,相当于做了双重保险,命令不会丢失。

那么子进程在整个AOF后台重写命令时,会进行如下三个操作:

  • 对现有数据库中的键值对转换成字符串命令,并写入和同步到临时AOF后台重写文件中
  • 完成上述步骤后,将AOF后台重写缓冲区的数据存入临时AOF后台重写文件中
  • 最后,执行更名操作,覆盖原有的AOF文件,完成新旧更替

AOF后台重写操作由如下两个函数完成,这里就不列出源码了。代码太多了而且都差不多。

// 后台执行AOF重写操作
int rewriteAppendOnlyFileBackground(void);
// 执行AOF后台重写缓冲区内数据的重写和更名操作,完成整个AOF后台重写功能
void backgroundRewriteDoneHandler(int exitcode, int bysignal);

最后,只剩下最后一个问题了,除了显示运行命令执行,Redis还在什么时候执行后台重写操作,我一路追溯到这段代码:

/* 此段代码截取自server.c文件中的serverCron函数中 */
// 如果后台没有执行rdb,aof,以及aof重写操作,而且aof文件的大于执行BGREWRITEAOF所需的最小大小

/* Trigger an AOF rewrite if needed. */
        if (server.aof_state == AOF_ON &&
            !hasActiveChildProcess() &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
/* 此段代码截取自server.c文件中的serverCron函数中 */
// 接收子进程发来的信号,非阻塞

void checkChildrenDone(void) {
    int statloc = 0;
    pid_t pid;

    if ((pid = waitpid(-1, &statloc, WNOHANG)) != 0) {
        int exitcode = WIFEXITED(statloc) ? WEXITSTATUS(statloc) : -1;
        int bysignal = 0;

        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

        /* sigKillChildHandler catches the signal and calls exit(), but we
         * must make sure not to flag lastbgsave_status, etc incorrectly.
         * We could directly terminate the child process via SIGUSR1
         * without handling it */
        if (exitcode == SERVER_CHILD_NOERROR_RETVAL) {
            bysignal = SIGUSR1;
            exitcode = 1;
        }

        if (pid == -1) {
            serverLog(LL_WARNING,"waitpid() returned an error: %s. "
                "child_type: %s, child_pid = %d",
                strerror(errno),
                strChildType(server.child_type),
                (int) server.child_pid);
        } else if (pid == server.child_pid) {
            if (server.child_type == CHILD_TYPE_RDB) {
                backgroundSaveDoneHandler(exitcode, bysignal);
            } else if (server.child_type == CHILD_TYPE_AOF) {
                backgroundRewriteDoneHandler(exitcode, bysignal);
            } else if (server.child_type == CHILD_TYPE_MODULE) {
                ModuleForkDoneHandler(exitcode, bysignal);
            } else {
                serverPanic("Unknown child type %d for child pid %d", server.child_type, server.child_pid);
                exit(1);
            }
            if (!bysignal && exitcode == 0) receiveChildInfo();
            resetChildState();
        } else {
            if (!ldbRemoveChild(pid)) {
                serverLog(LL_WARNING,
                          "Warning, detected child with unmatched pid: %ld",
                          (long) pid);
            }
        }

        /* start any pending forks immediately. */
        replicationStartPendingFork();
    }
}