redis4.0之利用管道优化aofrewrite

前言

redis的aof持久化本质上是一个redo log,把所有执行过的写命令追加到aof文件中。那么随着redis的运行,aof文件会不断膨胀,当触发收缩条件时就要做aofrewrite。

redis是通过fork子进程来做aofrewrite,同时为了保证aof的连续性,父进程把aofrewrite期间的写命令缓存起来,等收割完子进程之后再追加到新的aof文件。如果期间写入量较大的话收割时就要有大量的写磁盘操作,造成性能下降。

为了提高aofrewrite效率,redis通过在父子进程间建立管道,把aofrewrite期间的写命令通过管道同步给子进程,追加写盘的操作也就转交给了子进程。其实利用管道的优化在3.0时期就已经实现了,最近在总结4.0的新特性,索性就都归到4.0里,方便查阅。

aofrewrite详解

1. aofrewrite的基础实现

redis4.0之利用管道优化aofrewrite

上图是aofrewrite的流程,标注为基本的函数调用关系。

  • 1 - 首先,通过命令或是事件触发aofrewrite,调用rewriteAppendOnlyFileBackground()函数

    • 该函数会fork出一个子进程
  • 2 - 父进程记录子进程的pid并开始缓存写命令

    • 当pid不为-1时就会执行aofRewriteBufferAppend()把写命令缓存起来
  • 3 - 子进程调用rewriteAppendOnlyFile(tmpfile)函数创建新的aof文件

    • 调用rewriteAppendOnlyFileRio()函数遍历redis把所有key-value以命令的方式写入新aof文件
    • 完成后调用exitFromChild(0)退出
  • 4 - 子进程退出后父进程调用backgroundRewriteDoneHandler()来处理

    • 调用aofRewriteBufferWrite()函数把积攒的写命令缓存写入子进程创建的临时aof文件
    • 最后rename()用新的aof文件替换掉原来的aof文件

在aofrewrite过程中,如果redis本身数据量较大子进程执行时间较长,或者写入流量较高,就会导致aof-rewrite-buffer积攒较多,父进程就要进行大量写磁盘操作,这对于redis来说显然是不够高效的。

2. 使用pipe优化

为了提高aofrewrite效率,redis使用pipe来优化,下图中红色标注即为优化的部分:

redis4.0之利用管道优化aofrewrite

优化点:

  • 1 - 父进程建立管道

    • 共三条管道,分别为一条数据管道,和两条控制管道
    • 数据管道用来传输数据,控制管道用来做父子进程交互,控制何时停止数据传输
  • 2 - 父进程向管道写数据

    • 注册写事件aofChildWriteDiffData()向数据管道写数据
  • 3 - 子进程从管道读数据

    • 子进程在生成新aof文件时会定期调用aofReadDiffFromParent()从管道读取数据,并缓存下来

  • 4 - 父子进程交互

    • 子进程生成新aof文件后会通过控制管道向父进程发送"!",发起停止数据传输请求
    • 父进程收到停止信号后激活读事件处理函数aofChildPipeReadable(),设置server.aof_stop_sending_diff=1停止数据传输,并向子进程回复"!",表示同意停止
    • 子进程收到父进程的应答,调用rioWrite()把积攒的数据追加到新的aof文件,最后退出

细心的读者会发现,aofRewriteBufferAppend()和aofRewriteBufferWrite()这一对函数仍然保留,父进程还是要把aof-rewrite-buffer写盘吗?是的,这是因为父子进程是异步结构,父子间总会有那么一点代沟,aof-rewrite-buffer还是需要保留的,不过这个时候父进程写盘的数据量就很小了,几乎可以忽略。

3. aofrewrite代码剖析

aofrewrite的触发条件

    1. 执行bgrewriteaof命令。
    1. serverCron时间事件检测到aof文件大小超限。

命令的触发不必详述,主要来看下serverCron的触发:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }
    ...
}

也就是说aof文件大小超过了server.aof_rewrite_min_size,并且增长率大于server.aof_rewrite_perc时就会触发(增长率计算的基数server.aof_rewrite_base_size是上次aofrewrite完之后aof文件的大小)。

目前云redis设置server.aof_rewrite_min_size为内存规格的1/4,server.aof_rewrite_perc为100。

管道建立

aofrewrite触发之后进入rewriteAppendOnlyFileBackground()函数:

int rewriteAppendOnlyFileBackground(void) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    if (aofCreatePipes() != C_OK) return C_ERR;
    openChildInfoPipe();
    start = ustime();
    if ((childpid = fork()) == 0) {
    ...

OK,重点来了,在fork之前调用了aofCreatePipes()函数来创建管道(openChildInfoPipe()函数只是用来收集子进程copy-on-write用到的内存,就不详细展开了):

int aofCreatePipes(void) {
    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    if (pipe(fds) == -1) goto error;   /* parent -> children data. 父进程向子进程写数据的管道*/
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack.  子进程向父进程发起停止传输的控制管道*/
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack.  父进程向子进程回复的控制管道*/
    /* Parent -> children data is non blocking. */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
    //注册读事件处理函数,负责处理子进程要求停止数据传输的消息

    server.aof_pipe_write_data_to_child = fds[1];      //父进程向子进程写数据的fd
    server.aof_pipe_read_data_from_parent = fds[0];    //子进程从父进程读数据的fd
    server.aof_pipe_write_ack_to_parent = fds[3];      //子进程向父进程发起停止消息的fd
    server.aof_pipe_read_ack_from_child = fds[2];      //父进程从子进程读取停止消息的fd
    server.aof_pipe_write_ack_to_child = fds[5];       //父进程向子进程回复消息的fd
    server.aof_pipe_read_ack_from_parent = fds[4];     //子进程从父进程读取回复消息的fd
    server.aof_stop_sending_diff = 0;                  //是否停止管道传输标记位
    return C_OK;
    ...
}

父进程与管道传输

管道建立起来了我们再来看看fork之后父进程和子进程如何工作,首先看下父进程:

        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        ...
        server.aof_rewrite_scheduled = 0;
        server.aof_rewrite_time_start = time(NULL);
        server.aof_child_pid = childpid;
        updateDictResizePolicy();
        /* We set appendseldb to -1 in order to force the next call to the
         * feedAppendOnlyFile() to issue a SELECT command, so the differences
         * accumulated by the parent into server.aof_rewrite_buf will start
         * with a SELECT statement and it will be safe to merge. */
        server.aof_selected_db = -1;
        ...

父进程这里做的事情并不多,主要是信息的记录和一些标记位设置

  • 记录fork消耗的时间,info命令可以查看上次fork的耗时latest_fork_usec,单位微秒
  • 设置server.aof_rewrite_scheduled = 0,防止serverCron再次触发aofrewrite
  • 设置server.aof_child_pid为子进程pid,其不为-1时redis才会向aof-rewrite-buffer缓存写命令
  • updateDictResizePolicy()禁止所有hash数据结构resize,这是为了尽量避免子进程copy-on-write进行内存拷贝
  • 设置server.aof_selected_db = -1,下一次的aof日志会强制加上select,这是为了保证命令执行到正确的db

接下来就是缓存写命令和管道通信部分了,入口是在feedAppendOnlyFile():

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ...
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    ...
}

server.aof_child_pid在这时就生效了,开始缓存写命令:

void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;

    while(len) {
        /* If we already got at least an allocated block, try appending
         * at least some piece into it. */
        if (block) {
            unsigned long thislen = (block->free < len) ? block->free : len;
            if (thislen) {  /* The current block is not already full. */
                memcpy(block->buf+block->used, s, thislen);
                block->used += thislen;
                block->free -= thislen;
                s += thislen;
                len -= thislen;
            }
        }

        if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
        }
    }
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

redis用链表server.aof_rewrite_buf_blocks来缓存aofrewrite期间的写命令,链表的每个节点最大10MB;重点是在最后的写事件注册,当server.aof_pipe_write_data_to_child这个fd没有注册事件时,就注册写事件函数aofChildWriteDiffData:

void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    ...
    while(1) {
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

每次事件循环都会把server.aof_rewrite_buf_blocks积攒的写命令全部同步给子进程,除非server.aof_stop_sending_diff被设置了停止标记。

子进程和管道传输

接下来看下子进程:

...
        /* Child */
        char tmpfile[256];
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            ...
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
...

子进程首先关闭监听端口,然后就进入rewriteAppendOnlyFile()函数:

int rewriteAppendOnlyFile(char *filename) {
    ...
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    ...
    server.aof_child_diff = sdsempty();
    ...
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    ...

首先打开一个临时aof文件,并初始化server.aof_child_diff缓存准备从父进程读数据,然后就调用rewriteAppendOnlyFileRio()来写aof文件和读取管道中的数据:

int rewriteAppendOnlyFileRio(rio *aof) {
    ...
            if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
                processed = aof->processed_bytes;
                aofReadDiffFromParent();
            }
    ...
}

在遍历redis把key-value写入新aof文件过程中,新aof文件每增长10K就会调用aofReadDiffFromParent()从管道中读取数据追加到server.aof_child_diff:

ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

停止管道传输

子进程在遍历完redis生成好新的aof文件之后就要准备退出了,那么退出前要先告诉父进程停止管道传输,依然回到rewriteAppendOnlyFile()函数来看:

int rewriteAppendOnlyFile(char *filename) {
    ...
    /* Ask the master to stop sending diffs. */
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
        goto werr;
    /* We read the ACK from the server using a 10 seconds timeout. Normally
     * it should reply ASAP, but just in case we lose its reply, we are sure
     * the child will eventually get terminated. */
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
        byte != '!') goto werr;
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

    /* Read the final diff if any. */
    aofReadDiffFromParent();

    /* Write the received diff to the file. */
    serverLog(LL_NOTICE,
        "Concatenating %.2f MB of AOF diff received from parent.",
        (double) sdslen(server.aof_child_diff) / (1024*1024));
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
        goto werr;

    /* Make sure data will not remain on the OS's output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;
    ...
}

这里写的就很直接了:

  • 使用write向控制管道写入"!"发起停止请求,然后读取返回结果,超时时间为10s
  • 超时就goto werr异常退出,10s内读取到"!"就继续
  • 再次调用aofReadDiffFromParent()从数据管道读取数据确保管道中没有遗留
  • 最后rioWrite()把server.aof_child_diff积攒的数据追加到新的aof文件

那么父进程是如何处理"!"的呢,还记得之前注册的读事件aofChildPipeReadable()吧,子进程向控制管道发送"!"就会激活:

void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
    char byte;
    ...
    if (read(fd,&byte,1) == 1 && byte == '!') {
        serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
        server.aof_stop_sending_diff = 1;
        if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
            /* If we can't send the ack, inform the user, but don't try again
             * since in the other side the children will use a timeout if the
             * kernel can't buffer our write, or, the children was
             * terminated. */
            serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
                strerror(errno));
        }
    }
    /* Remove the handler since this can be called only one time during a
     * rewrite. */
    aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

很简单,标记server.aof_stop_sending_diff=1,给子进程回复"!",并且把自己从事件循环删掉,自此父子进程间通信完成,剩下的就是父进程等待子进程退出进行收尾工作。

父进程收尾

serverCron()中会调用wait3()来收割子进程:

    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == -1) {
                serverLog(LL_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
            } else {
                if (!ldbRemoveChild(pid)) {
                    serverLog(LL_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
            }
            updateDictResizePolicy();
            closeChildInfoPipe();
        }

如果收割到的pid是server.aof_child_pid就进入backgroundRewriteDoneHandler():

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
    ...
        /* Flush the differences accumulated by the parent to the
         * rewritten AOF. */
        latencyStartMonitor(latency);
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            serverLog(LL_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }

        if (aofRewriteBufferWrite(newfd) == -1) {
            serverLog(LL_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

首先会打开子进程生成的新aof文件,并调用aofRewriteBufferWrite()把server.aof_rewrite_buf_blocks中剩余的数据追加到新aof文件。

        /* Rename the temporary file. This will not unlink the target file if
         * it exists, because we reference it with "oldfd". */
        latencyStartMonitor(latency);
        if (rename(tmpfile,server.aof_filename) == -1) {
            serverLog(LL_WARNING,
                "Error trying to rename the temporary AOF file %s into %s: %s",
                tmpfile,
                server.aof_filename,
                strerror(errno));
            close(newfd);
            if (oldfd != -1) close(oldfd);
            goto cleanup;
        }
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-rename",latency);

之后把新aof文件rename为server.aof_filename记录的文件名。

        /* Asynchronously close the overwritten AOF. */
        if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

使用bio后台线程来close原来的aof文件。

cleanup:
    aofClosePipes();
    aofRewriteBufferReset();
    aofRemoveTempFile(server.aof_child_pid);
    server.aof_child_pid = -1;
    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
    server.aof_rewrite_time_start = -1;
    /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
    if (server.aof_state == AOF_WAIT_REWRITE)
        server.aof_rewrite_scheduled = 1;

最后是清理工作,包括关闭管道、重置aof-rewrite-buffer、复位server.aof_child_pid=-1等,自此aofrewrite完成。

后记

基于4.0的云redis已正式上线,登陆阿里云控制台即可开通:https://www.aliyun.com/product/kvstore

上一篇:MySQL数据库快速部署实践


下一篇:NodeJS配置和删除淘宝镜像的方法