演示一个小例子来感受一下RDB持久化
/* 开启一个redis-cli,执行添加数据操作如下 */
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> set hello world
OK
127.0.0.1:6379> SAVE
OK
接着,我们来看看这个文件中存放着什么数据,保存到磁盘的文件名为dump.rdb
,利用od命令就能查看里面的数据。(od -c:单字节八进制解释进行输出)
# od -c dump.rdb
0000000 R E D I S 0 0 0 9 372 \t r e d i s
0000020 - v e r 005 6 . 0 . 4 372 \n r e d i
0000040 s - b i t s 300 @ 372 005 c t i m e 302
0000060 + 317 244 a 372 \b u s e d - m e m 302 H
0000100 262 \r \0 372 \f a o f - p r e a m b l
0000120 e 300 \0 376 \0 373 001 \0 \0 005 h e l l o 005
0000140 w o r l d 377 346 V 307 376 ? ! 252 \n
0000156
大致可以看到里面有如下信息:
- RDB文件标识和版本号:REDIS0009
- Redis版本:redis-ver 6.0.4
- Redis系统位数(32位或64位):redis-bits s300@
- 系统时间:ctime 302 + 317 244 a
- 内存使用量:used-mem 302 H 262 \r \0
- aof:aof-preamble: 300 https://gist.github.com/antirez/ae068f95c0d084891305
- 一组键值对:hello-word
可以看到第一个不懂的就是出现的372这个数字,这个其实是250的的八进制,表示这是redis的辅助信息,而377对应着就是255的八进制
#define RDB_OPCODE_AUX 250 /* RDB aux field. */
/* Save an AUX field. */
ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
ssize_t ret, len = 0;
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
len += ret;
return len;
}
调用rdbSaveAuxField:
ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
调用rdbSaveAuxFieldStrStr
调用rdbSaveAuxFieldStrInt
总函数rdbSaveInfoAuxFields--rdbSaveRio
RDB文件结构
————————————————————————————————————————————
| 文件标识 | 辅助信息 | 数据库 | 结束符 | 校验和 |
————————————————————————————————————————————
文件标识
Redis在每一个RDB文件的首部都写入了如下字符,用来标识这是一个Redis的RDB文件。
—————————————————————
| REDIS | 文件版本号 |
—————————————————————
辅助信息
Redis在新的RDB文件版本上加入了辅助信息,其格式如下:
————————————————————————————————————————————
| redis版本 | 系统位数 | 系统时间 | 已使用的内存 |
————————————————————————————————————————————
例如:在上述的示例中,这些信息对应着:
372
表示是一个辅助信息\t redis-ver
表示后面的数据代表Redis的版本号005 6.0.4
表示当前Redis版本为6.0.4,005代表长度为5\n redis-bits
表示后面的数据为当前Redis服务器的位数300 @
乱码,应该是代表系统为64位005 ctime
表示后面跟着的数据为系统当前时间302 + 317 244 a 372 \b
当前系统时间\b used-mem
表示后面的数据为已使用的内存数302 _ 017 \0
已使用的内存数
数据库
Redis服务器默认有16个数据库,每个数据的信息是一次写入rdb文件中,每个数据库的信息的存放格式如下:
————————————————————————————————————————————————————
| select | dbnum | db_size | expires_size | 键值数据 |
————————————————————————————————————————————————————
其中,select标识当前进行切换数据库操作,后面的dnum表示当前存放的是第dbnum号数据库的数据。示例中的二进制码对应的信息如下:
376 \0
表示切换到第0号数据库; 254373 \1
表示当前数据库中只有一个数据; 251\0
表示当前没有过期键
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
键值数据
键值数据的存放格式如下:
———————————————————————————————————————————————————————
| 过期键标识 | 时间戳 | 键值对类型 | 键长度 | 键 | 值长度 | 值 |
———————————————————————————————————————————————————————
其中,过期键标识和时间戳是可选项,如果该键设置了过期时间就需要在数据前面加上这些信息。过期键标识由以下两个宏定义给出:
/* 以ms为单位的过期时间 */
#define RDB_OPCODE_EXPIRETIME_MS 252
/* 以s为单位的过期时间 */
#define RDB_OPCODE_EXPIRETIME 253
键值对类型为Redis的五个数据类型,其宏定义如下:
#define RDB_TYPE_STRING 0 // 字符串标识
#define RDB_TYPE_LIST 1 // 链表标识
#define RDB_TYPE_SET 2 // 集合标识
#define RDB_TYPE_ZSET 3 // 有序集合标识
#define RDB_TYPE_HASH 4 // 哈希标识
在示例中,各二进制位代表的含义如下:
\0
标识后面是一个字符串键005 hello
长度为5的字符串hello005 world
长度为5的字符串world
结束符
每个RDB文件都以EOF结束符结尾。上述示例中对应EOF的是:
377
标识EOF标志位
其宏定义如下:
#define RDB_OPCODE_EOF 255
校验和
Redis在每一个RDB文件的末尾加上了采用CRC校验的校验和,二进制中最后一串乱码标识的就是校验和,如果我们用od -cx dump.rdb
就可以更直观的看到检验和为多少。
# od -cx dump.rdb
0000000 R E D I S 0 0 0 9 372 \t r e d i s
4552 4944 3053 3030 fa39 7209 6465 7369
0000020 - v e r 005 6 . 0 . 4 372 \n r e d i
762d 7265 3605 302e 342e 0afa 6572 6964
0000040 s - b i t s 300 @ 372 005 c t i m e 302
2d73 6962 7374 40c0 05fa 7463 6d69 c265
0000060 + 317 244 a 372 \b u s e d - m e m 302 H
cf2b 61a4 08fa 7375 6465 6d2d 6d65 48c2
0000100 262 \r \0 372 \f a o f - p r e a m b l
0db2 fa00 610c 666f 702d 6572 6d61 6c62
0000120 e 300 \0 376 \0 373 001 \0 \0 005 h e l l o 005
c065 fe00 fb00 0001 0500 6568 6c6c 056f
0000140 w o r l d 377 346 V 307 376 ? ! 252 \n
6f77 6c72 ff64 56e6 fec7 213f 0aaa
0000156
0x 0aaa 213f fec7 56e6就代表的是该RDB文件的校验和(校验和以小端模式存储)。
长度编码
在之前的压缩列表和整数集合中就多次见识到Redis为了节省内存做的各种措施,由于C语言中对于指针指向的内存无法计算长度,所以必须将该段内存的大小标识出来。在Redis中,有很多长度信息需要保存,如字符串的长度,链表的长度,数据库的大小等,针对不同大小的长度数据,Redis会使用不同的编码格式来节省内存。我们先来看看这些宏定义
/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
* the first byte to interpreter the length:
*
* 00|XXXXXX => if the two MSB are 00 the len is the 6 bits of this byte
* 01|XXXXXX XXXXXXXX => 01, the len is 14 bits, 6 bits + 8 bits of next byte
* 10|000000 [32 bit integer] => A full 32 bit len in net byte order will follow
* 10|000001 [64 bit integer] => A full 64 bit len in net byte order will follow
* 11|OBKIND this means: specially encoded object will follow. The six bits
* number specify the kind of object that follows.
* See the RDB_ENC_* defines.
*
* Lengths up to 63 are stored using a single byte, most DB keys, and may
* values, will fit inside. */
#define RDB_6BITLEN 0
#define RDB_14BITLEN 1
#define RDB_32BITLEN 0x80
#define RDB_64BITLEN 0x81
#define RDB_ENCVAL 3
#define RDB_LENERR UINT64_MAX
redis中存长度的编码:
#define htonu64(v) (v)
/* Saves an encoded length. The first two bits in the first byte are used to
* hold the encoding type. See the RDB_* definitions for more information
* on the types of encoding. */
int rdbSaveLen(rio *rdb, uint64_t len) {
unsigned char buf[2];
size_t nwritten;
if (len < (1<<6)) { //长度小于32
/* Save a 6 bit len */
buf[0] = (len&0xFF)|(RDB_6BITLEN<<6); (11111111 & len) | 00000000
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
nwritten = 1; //长度只需要一个字节
} else if (len < (1<<14)) {
/* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
buf[1] = len&0xFF;
//比如 len = 00100000 10000000
//buf[0] = (00100000 & 1111111) | 01000000
//buf[1] = 10000000 & 0xFF;
if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
nwritten = 2;
} else if (len <= UINT32_MAX) {
/* Save a 32 bit len */
buf[0] = RDB_32BITLEN; 0x80
if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 写一个
uint32_t len32 = htonl(len);
if (rdbWriteRaw(rdb,&len32,4) == -1) return -1; 在写四个
nwritten = 1+4;
} else {
/* Save a 64 bit len */
buf[0] = RDB_64BITLEN;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
len = htonu64(len);
if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
nwritten = 1+8;
}
return nwritten;
}
static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
if (rdb && rioWrite(rdb,p,len) == 0)
return -1;
return len;
}
Rio包即为Robust io函数包。包中函数是对Linux基本I/O函数的封装,使其更加健壮、高效,更适用于网络编程。
https://www.cxyzjd.com/article/weixin_42709632/104785618
C语言htonl()函数:将32位主机字符顺序转换成网络字符顺序
特殊编码
特殊编码主要是将一些用字符串表示的小整数转换成整数编码,以节省内存,比如”12”,”-1”等。Redis对于这些小整数类型的字符串有以下几种不同的编码格式,用宏定义指出。
/* When a length of a string object stored on disk has the first two bits
* set, the remaining six bits specify a special encoding for the object
* accordingly to the following defines: */
#define RDB_ENC_INT8 0 /* 8 bit signed integer */
#define RDB_ENC_INT16 1 /* 16 bit signed integer */
#define RDB_ENC_INT32 2 /* 32 bit signed integer */
#define RDB_ENC_LZF 3 /* string compressed with FASTLZ */
11|0000|00 00000000 // 后面八字节表示该整数
11|0000|01 00000000 00000000 // 后面16字节表示该整数
11|0000|10 [32 bits] // 后面32位表示该整数
11|0000|11 // 表示LZF压缩后的数据
所以,存储一个能用八字节表示字符串有符整数需要2位;存储一个能用16字节表示的有符整数需要3字节;存储一个能用32字节表示的有符整数需要5个字节。
特殊编码的实现由rdbTryIntegerEncoding和rdbEncodeInteger函数完成。
#define RDB_ENCVAL 3
/* String objects in the form "2391" "-100" without any space and with a
* range of values that can fit in an 8, 16 or 32 bit signed value can be
* encoded as integers to save space */
int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
long long value;
if (string2ll(s, len, &value)) {
return rdbEncodeInteger(value, enc);
} else {
return 0;
}
}
/* Encodes the "value" argument as integer when it fits in the supported ranges
* for encoded types. If the function successfully encodes the integer, the
* representation is stored in the buffer pointer to by "enc" and the string
* length is returned. Otherwise 0 is returned. */
int rdbEncodeInteger(long long value, unsigned char *enc) {
if (value >= -(1<<7) && value <= (1<<7)-1) {
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8; 11000000 | 0
enc[1] = value&0xFF;
return 2;
} else if (value >= -(1<<15) && value <= (1<<15)-1) {
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
return 3;
} else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
enc[3] = (value>>16)&0xFF;
enc[4] = (value>>24)&0xFF;
return 5;
} else {
return 0;
}
}
LZF编码
当Redis开启了字符串压缩功能且字符串长度大于20bytes时,会采用LZF编码对其进行压缩,开启字符串压缩功能的变量为:
rdbcompression yes // redis.conf中设定的,默认为开启状态
当字符串写入RDB文件时,判断上述条件成立与否,进而选择编码格式,其源码片段如下:
/* Save a string object as [len][data] on disk. If the object is a string
* representation of an integer value we try to save it in a special form */
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
......
/* Try LZF compression - under 20 bytes it's unable to compress even
* aaaaaaaaaaaaaaaaaa so skip it */
if (server.rdb_compression && len > 20) {
n = rdbSaveLzfStringObject(rdb,s,len);
if (n == -1) return -1;
if (n > 0) return n;
/* Return value of 0 means data can't be compressed, save the old way */
}
真正执行编码操作的函数是rdbSaveLzfStringObject,其按照上述的编码格式对数据进行压缩。
ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
size_t comprlen, outlen;
void *out;
/* We require at least four bytes compression for this to be worth it */
if (len <= 4) return 0;
outlen = len-4;
if ((out = zmalloc(outlen+1)) == NULL) return 0;
comprlen = lzf_compress(s, len, out, outlen);
if (comprlen == 0) {
zfree(out);
return 0;
}
ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
zfree(out);
return nwritten;
}
ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
size_t original_len) {
unsigned char byte;
ssize_t n, nwritten = 0;
/* Data compressed! Let's save it on disk */
byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
nwritten += n;
if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
nwritten += n;
if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
nwritten += n;
if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
nwritten += n;
return nwritten;
writeerr:
return -1;
}
从源码中可以看出,经过LZF算法压缩的字符串在内存中的布局如下:
——————————————————————————————————————————————————————
| LZF标识(11000011) | 压缩后的长度 | 原长度 | 压缩后的数据 |
——————————————————————————————————————————————————————
对象编码
/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
case OBJ_STRING:
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb,RDB_TYPE_ZSET_LISTPACK);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb,RDB_TYPE_HASH_LISTPACK);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
serverPanic("Unknown object type");
}
return -1; /* avoid warning */
}
String对象编码
«««< HEAD
string对象的底层编码有三种,分别是OBJ_ENCODING_INT
、OBJ_ENCODING_RAW
和OBJ_ENCODING_EMBSTR
,这三种编码的不同之处各位可以跳转复习一下。在写入RDB文件时,会判断String对象的编码类型,从而选择以何种编码方式写入到RDB文件中。字符串是按照如下三种格式存放在RDB文件中的。
=======
前面在[Redis源码剖析–字符串t_string一文中,有介绍到string对象的底层编码有三种,分别是OBJ_ENCODING_INT
、OBJ_ENCODING_RAW
和OBJ_ENCODING_EMBSTR
,这三种编码的不同之处各位可以跳转复习一下。在写入RDB文件时,会判断String对象的编码类型,从而选择以何种编码方式写入到RDB文件中。字符串是按照如下三种格式存放在RDB文件中的。
eb5e8e60cc4fa97f186b143899c1f06f1f68ac0a
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
/* Save a string value */
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
str的int为一种,raw和embstr为另一种
/* Like rdbSaveRawString() gets a Redis object instead. */
ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
/* Avoid to decode the object, then encode it again, if the
* object is already integer encoded. */
if (obj->encoding == OBJ_ENCODING_INT) {
return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
} else {
serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
}
}
下面就是两类不同encoding的实现
/* Save a long long value as either an encoded string or a string. */
ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
unsigned char buf[32];
ssize_t n, nwritten = 0;
int enclen = rdbEncodeInteger(value,buf); //尝试编码为整数
if (enclen > 0) {
return rdbWriteRaw(rdb,buf,enclen); //直接返回
} else {
/* Encode as string */
enclen = ll2string((char*)buf,32,value);
serverAssert(enclen < 32); 32位
if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1; //记录长度
nwritten += n;
if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1; //写入值
nwritten += n;
}
return nwritten;
}
/* Save a string object as [len][data] on disk. If the object is a string
* representation of an integer value we try to save it in a special form */
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
int enclen;
ssize_t n, nwritten = 0;
/* Try integer encoding */ //先尝试下能否转换为intencoding
if (len <= 11) {
unsigned char buf[5];
if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
return enclen;
}
}
/* Try LZF compression - under 20 bytes it's unable to compress even
* aaaaaaaaaaaaaaaaaa so skip it */
if (server.rdb_compression && len > 20) { //如果长度大于20
n = rdbSaveLzfStringObject(rdb,s,len);
if (n == -1) return -1;
if (n > 0) return n;
/* Return value of 0 means data can't be compressed, save the old way */
}
/* Store verbatim */ 逐字
if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
nwritten += n;
if (len > 0) {
if (rdbWriteRaw(rdb,s,len) == -1) return -1;
nwritten += len;
}
return nwritten;
}
/* String objects in the form "2391" "-100" without any space and with a
* range of values that can fit in an 8, 16 or 32 bit signed value can be
* encoded as integers to save space */
int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
long long value;
if (string2ll(s, len, &value)) {
return rdbEncodeInteger(value, enc);
} else {
return 0;
}
}
字符串是按照如下三种格式存放在RDB文件中的。
/* 按照字符串编码的形式 */
————————————————
| len | data |
————————————————
/* 按照INT编码的形式 */
——————————————————
| Encoding | int |
——————————————————
/* 按照LZF压缩后的形式 */
————————————————————————————————————————————
| LZF标识 | 压缩后的长度 | 原长度 | 压缩后的数据 |
————————————————————————————————————————————
List对象编码
List的底层编码只有quicklist
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
......
} else if (o->type == OBJ_LIST) {
/* Save a list value */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head; //从头开始遍历
if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1; 先存下quicklist的节点的长度
nwritten += n;
while(node) {
if ((n = rdbSaveLen(rdb,node->container)) == -1) return -1; //存下node类型
nwritten += n;
if (quicklistNodeIsCompressed(node)) { //如果是经过压缩的节点
void *data;
size_t compress_len = quicklistGetLzf(node, &data);
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
nwritten += n;
} else {
if ((n = rdbSaveRawString(rdb,node->entry,node->sz)) == -1) return -1; //直接存entry
nwritten += n;
}
node = node->next;
}
} else {
serverPanic("Unknown list encoding");
}
其存放格式如下:
————————————————————————————————————————————————————————————————————————
| listLength | len1| data1 | len2 | CompressLength| OriginLength | data2 |
————————————————————————————————————————————————————————————————————————
Set对象编码
set的底层采用字典或者整数集合的编码形式
} else if (o->type == OBJ_SET) {
/* Save a set value */
if (o->encoding == OBJ_ENCODING_HT) {
dict *set = o->ptr;
dictIterator *di = dictGetIterator(set);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) { //存入dict的size
dictReleaseIterator(di);
return -1;
}
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele))) //
== -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
size_t l = intsetBlobLen((intset*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; //直接写入intset的内容
nwritten += n;
} else {
serverPanic("Unknown set encoding");
}
}
—————————————————————————————————————————
| setSize | elem1 | elem2 | ... | elemN |
—————————————————————————————————————————
/* 集合存储示例 */
————————————————————————————————————————————
| 3 | 3 | "zee" | 5 | "coder" | 5 | "cheng" |
————————————————————————————————————————————
Zset对象编码
zset采用zskiplist或者ziplist编码
} else if (o->type == OBJ_ZSET) {
/* Save a sorted set value */
if (o->encoding == OBJ_ENCODING_LISTPACK) {
//listpack后续分析
size_t l = lpBytes((unsigned char*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
zskiplist *zsl = zs->zsl;
if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
nwritten += n;
/* We save the skiplist elements from the greatest to the smallest
* (that's trivial since the elements are already ordered in the
* skiplist): this improves the load process, since the next loaded
* element will always be the smaller, so adding to the skiplist
* will always immediately stop at the head, making the insertion
* O(1) instead of O(log(N)). */
//从后往前,用bw更快
zskiplistNode *zn = zsl->tail;
while (zn != NULL) {
if ((n = rdbSaveRawString(rdb,
(unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
{
return -1;
}
nwritten += n;
if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
return -1;
nwritten += n;
zn = zn->backward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
———————————————————————————————————————————————————————————————————————
| zset_length | elem1 | score1 | elem2 | score2 | ... | elem3 | score3 |
———————————————————————————————————————————————————————————————————————
Hash对象编码
} else if (o->type == OBJ_HASH) {
/* Save a hash value */
if (o->encoding == OBJ_ENCODING_LISTPACK) {
size_t l = lpBytes((unsigned char*)o->ptr);
if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;
if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;
while((de = dictNext(di)) != NULL) {
sds field = dictGetKey(de);
sds value = dictGetVal(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
sdslen(field))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
sdslen(value))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown hash encoding");
}
—————————————————————————————————————————————————
| hashSize | key1 | value1| .... | key2 | value2 |
—————————————————————————————————————————————————
还有之前没有分析过的stream和module,之后分析
} else if (o->type == OBJ_STREAM) {
/* Store how many listpacks we have inside the radix tree. */
stream *s = o->ptr;
rax *rax = s->rax;
if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
nwritten += n;
/* Serialize all the listpacks inside the radix tree as they are,
* when loading back, we'll use the first entry of each listpack
* to insert it back into the radix tree. */
raxIterator ri;
raxStart(&ri,rax);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
unsigned char *lp = ri.data;
size_t lp_bytes = lpBytes(lp);
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
}
raxStop(&ri);
/* Save the number of elements inside the stream. We cannot obtain
* this easily later, since our macro nodes should be checked for
* number of items: not a great CPU / space tradeoff. */
if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
nwritten += n;
/* Save the last entry ID. */
if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */
/* Save the number of groups. */
size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
nwritten += n;
if (num_cgroups) {
/* Serialize each consumer group. */
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
/* Save the group name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
/* Last ID. */
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
/* Save the consumers of this group. */
if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
}
raxStop(&ri);
}
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
int retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
moduleInitIOContext(io,mt,rdb,key,dbid);
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1)
io.error = 1;
else
io.bytes += retval;
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
return io.error ? -1 : (ssize_t)io.bytes;
} else {
serverPanic("Unknown object type");
}