Redis 3.0.4 RDB持久化

  跳过了数据库等别的内容,跳到了RDB,别的之后会补回来、

  RDB持久化,是将redis内存中的数据库状态保存到磁盘里,以免数据意外丢失,可以手动执行,也可以通过配置执行。

    手动执行:

    1. SAVE: 会阻塞服务器进程,直到RDB文件创建完毕。
    2. BGSAVE:会fork一个子进程,然后由子进程负责创建RDB文件,父进程就需处理命令请求。

    触发配置:

      • save 900 1            //服务器900s之内,对数据库进行了一次修改
      • save 300 10      //服务器300s之内,对数据库进行了10次修改
      • save 60 10000      //服务器60秒之内,对数据库进行了至少10000次修改

        满足任意一条都会触发bgsave操作

  bgsave命令执行时的服务器状态

    在执行bgsave期间,服务器处理save、bgsave、aofwrite三个命令和平时不同。

      1.如果在执行bgsave期间,执行save,服务器会拒绝掉,避免父进程和子进程同时执行两个rdbsave,防止产生竞争条件。

      2.如果在执行bgsave期间,执行bgsave,服务器也会拒绝掉,因为同时执行两个bgsave会产生竞争条件。

      3.如果在执行bgsave期间,执行aof write,aof write会被延迟到bgsave命令执行完毕之后。

      4.如果在执行aof write期间,执行bgsave,服务器会拒绝掉。

void bgsaveCommand(redisClient *c) {
    //正在执行rdb持久化操作
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
    } else if (server.aof_child_pid != -1) {  //正在aof重写
        addReplyError(c,"Can‘t BGSAVE while AOF log rewriting is in progress");
    } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) {
        addReplyStatus(c,"Background saving started");   //执行bgsave
    } else {
        addReply(c,shared.err);
    }
}

 1.RDB文件结构

Redis 3.0.4 RDB持久化      

      1.REDIS:表示保存着“redis”五个字符,长度为5字节

      2.db_version:表示rdb的版本号,长度为4字节

      3.databases:包含零个或者任意多个数据库,一个各个数据库的键值对

      4.EOF:表示rdb文件的结束,1个字节

      6.check_sum:保存着校验和,8个字节

  databases部分

Redis 3.0.4 RDB持久化

      1.selectdb:表示接下来要读取的是一个db_number,1个字节

      2.db_number:保存着一个数据库号码,长度可能是1字节、2字节、5字节

      3.key_value_paris:保存着数据库中的键值对

  2.bgsave

    bgsave是fork了子进程,子进程中执行rdbSave,

int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;
    //查询是否正在进行备份  没有查是否在重写
    if (server.rdb_child_pid != -1) return REDIS_ERR;
    //备份当前数据库的脏键值
    server.dirty_before_bgsave = server.dirty;
    //备份执行的时间
    server.lastbgsave_try = time(NULL);

    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        //关闭子进程的监听套接字
        closeListeningSockets(0);
        //设置进程标题
        redisSetProcTitle("redis-rdb-bgsave");
        //执行保存操作  将数据库的写到filename中
        retval = rdbSave(filename);
        if (retval == REDIS_OK) {
            //得到子进程的脏私有虚拟页面大小
            //这是进程fork之后子进程多占用的内存,从/proc/$pid/smaps中读取Private_Dirty字段的值
            //进程fork之后,开始内存是共享的,即从父进程那里继承的内存空间都是Private_Clean,
            //运行一段时间之后,子进程对继承的内存空间做了修改,这部分内存就不能与父进程共享了,
            //需要多占用,这部分就是Private_Dirty
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                //将紫禁城分配的内容写到日志
                redisLog(REDIS_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }
        exitFromChild((retval == REDIS_OK) ? 0 : 1);
    } else {
        /* Parent */
        //计算fork执行的时间
        server.stat_fork_time = ustime()-start;
        //计算fork的速率 GB/每秒
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) {  //fork出错
            server.lastbgsave_status = REDIS_ERR;
            redisLog(REDIS_WARNING,"Can‘t save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }

        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);  //备份开始的时间
        server.rdb_child_pid = childpid;          //备份的子进程
        server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();  //关闭哈希表resize 因为resize会有复制拷贝动作
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */
int rdbSave(char *filename) {
    char tmpfile[256];
    FILE *fp;
    rio rdb;
    int error;
    //创建临时文件
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }
    //初始化一个rio对象  该对象是一个文件对象IO
    rioInitWithFile(&rdb,fp);
    //将数据库的内容写到rio中
    if (rdbSaveRio(&rdb,&error) == REDIS_ERR) {
        errno = error;
        goto werr;
    }

    /* Make sure data will not remain on the OS‘s output buffers */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    //更改备份的临时文件名称 
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }
    redisLog(REDIS_NOTICE,"DB saved on disk");
    server.dirty = 0;
    //重置脏数据
    server.lastsave = time(NULL);
    server.lastbgsave_status = REDIS_OK;
    return REDIS_OK;

werr:
    //rdbSaveRio函数的写入错误处理  写日志 关闭文件 删除临时文件
    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));   //这块看过一些实例  确实有备份失败的日志输出
    fclose(fp);
    unlink(tmpfile);
    return REDIS_ERR;
}
//将rdb格式文件内容写入到rio中
//rdb编码格式: redis version 辅助信息 databases EOF checksum
//     字节数:  5     4                          1      8
int rdbSaveRio(rio *rdb, int *error) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    long long now = mstime();
    uint64_t cksum;
    //开启了校验和选项
    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    //将redis + version 写入到magic
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); //redis + version 
    //将magic保存到rdb 
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    //遍历redis db
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;  //当前的数据库指针

        dict *d = db->dict;         //当数据库的键值对字典
        if (dictSize(d) == 0) continue;  //跳过空字典
        //创建一个字典类型迭代器
        di = dictGetSafeIterator(d);
        if (!di) return REDIS_ERR;

        /* Write the SELECT DB opcode */
        //写入数据库的标识 selectDB 下一个值是redis的db数
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
        //写入redis db数
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        /* Iterate this DB writing every entry */
        //遍历数据库所有键值对
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            //将键的键对象,值对象,过期时间写到rio中
            if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
        }
        //地方字典迭代器
        dictReleaseIterator(di);
    }
    di = NULL; /* So that we don‘t release it again on error. */
    /* EOF opcode */
    //EOF 1字节
    if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    //校验和
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return REDIS_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

   rdbSave函数创建了一个临时文件,然后使用该临时文件的文件指针fp初始化rio结构rdb,该结构是Redis中用于IO操作的数据结构,主要是封装了read和write操作。

然后调用rdbSaveRio,将数据库的内容写到临时文件中;之后调用fflush,fsync和fclose,保证数据已经写入到硬盘上,并且关闭临时文件。

   3.redis启动时加载rdb

  

//将指定的rdb文件读到数据库中
int rdbLoad(char *filename) {
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    FILE *fp;
    rio rdb;

    if ((fp = fopen(filename,"r")) == NULL) return REDIS_ERR;
    //创建一个rio
    rioInitWithFile(&rdb,fp);
    rdb.update_cksum = rdbLoadProgressCallback;
    rdb.max_processing_chunk = server.loading_process_events_interval_bytes;
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = \0;
    //校验redis
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    //校验版本
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can‘t handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }

    startLoading(fp);
    while(1) {
        robj *key, *val;
        expiretime = -1;

        /* Read type. */
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        //key的过期时间
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliseconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
        //EOF
        if (type == REDIS_RDB_OPCODE_EOF)
            break;

        /* Handle SELECT DB opcode as a special case */
        //select db 
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
        /* Read key */
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
        /* Read value */
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
        //主库加载rdb  会过滤当前key是否过期  从库不会
        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
            continue;
        }
        /* Add the new object in the hash table */
        dbAdd(db,key,val);
        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);

        decrRefCount(key);
    }
    /* Verify the checksum if RDB version is >= 5 */
    //rdb版本大于5 开启校验和
    if (rdbver >= 5 && server.rdb_checksum) {
        uint64_t cksum, expected = rdb.cksum;

        if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
        memrev64ifbe(&cksum);
        if (cksum == 0) {
            redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
        } else if (cksum != expected) {
            redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
            exit(1);
        }
    }

    fclose(fp);
    stopLoading();
    return REDIS_OK;

eoferr: /* unexpected end of file is handled here with a fatal exit */
    redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
    exit(1);
    return REDIS_ERR; /* Just to avoid warning */
}

  根据rdb格式进行解析和读取。

    

Redis 3.0.4 RDB持久化

上一篇:Windows10专业版身份验证错误,可能由于CredSSP加密数据库修正


下一篇:分布式存储——MySQL