Redis 数据结构 压缩列表(ziplist)
Redis是内存数据库,为了节省内存,在一些条件下使用了紧凑型数据结构,其中一个就是压缩列表(ziplist)
(1) 压缩列表(ziplist)是什么
ziplist是一种特殊编码的双向链表,旨在提高内存效率。
它存储字符串和整数值,其中整数被编码为实际整数而不是一系列字符。
它允许在O(1)时间内对列表的任一侧进行推送和弹出操作。
但是,由于每个操作都需要重新分配 ziplist 使用的内存,因此实际的复杂性与 ziplist 使用的内存量有关。
(1.1) 压缩列表结构
(2) 为什么要使用压缩列表
压缩列表可以节约内存
如果使用链表存储元素,链表节点里需要 val、next,而使用压缩列表,只需要存储val即可。
(2.1) 压缩列表特点
ziplist 的特点:
- 连续内存存储:每个元素紧凑排列,内存利用率高
- 变长编码:存储数据时,采用变长编码(满足数据长度的前提下,尽可能少分配内存)
3)寻找元素需遍历:存放太多元素,性能会下降(适合少量数据存储) - 级联更新:更新、删除元素,会引发级联更新(因为内存连续,前面数据膨胀/删除了,后面要跟着一起动)
(3) 压缩列表原理
area |<---- ziplist header ---->|<----------- entries ------------->|<-end->|
size 4 bytes 4 bytes 2 bytes ? ? ? ? 1 byte
+---------+--------+-------+--------+--------+--------+--------+-------+
component | zlbytes | zltail | zllen | entry1 | entry2 | ... | entryN | zlend |
+---------+--------+-------+--------+--------+--------+--------+-------+
^ ^ ^
address | | |
ZIPLIST_ENTRY_HEAD | ZIPLIST_ENTRY_END
|
ZIPLIST_ENTRY_TAIL
压缩列表是一块连续的内存空间。
为了方便查询,在头部记录了压缩列表占用的总字节数、entry尾部的地址 以及 压缩列表的元素个数。
字段 | 长度 | 作用 |
---|---|---|
zlbytes | 4字节 | 整个ziplist占用的内存字节数,对ziplist进行内存重分配,或者计算末端时使用。 |
zltail | 4字节 | ziplist尾节点的偏移量。通过这个偏移量,可以快速获取尾部节点。 |
zllen | 2字节 | 快速获取ziplist长度 |
zlend | 1字节 | 结束符 |
(3.1) 创建新的ziplist
area |<---- ziplist header ---->|<-- end -->|
size 4 bytes 4 bytes 2 bytes 1 byte
+---------+--------+-------+-----------+
component | zlbytes | zltail | zllen | zlend |
| | | | |
value | 1011 | 1010 | 0 | 1111 1111 |
+---------+--------+-------+-----------+
^
|
ZIPLIST_ENTRY_HEAD
&
address ZIPLIST_ENTRY_TAIL
&
ZIPLIST_ENTRY_END
(3.2) 将节点添加到末端
将新节点添加到 ziplist 的末端需要执行以下几个步骤:
- 记录到达 ziplist 末端所需的偏移量(因为之后的内存重分配可能会改变 ziplist 的地址,因此记录偏移量而不是保存指针)
- 根据新节点要保存的值,计算出编码这个值所需的空间大小,以及编码它前一个节点的长度所需的空间大小,然后对 ziplist 进行内存重分配。
- 设置新节点的各项属性: pre_entry_length 、 encoding 、 length 和 content 。
- 更新 ziplist 的各项属性,比如记录空间占用的 zlbytes ,到达表尾节点的偏移量 zltail ,以及记录节点数量的 zllen 。
area |<---- ziplist header ---->|<--- entries -->|<-- end -->|
size 4 bytes 4 bytes 2 bytes 5 bytes 1 bytes
+---------+--------+-------+----------------+-----------+
component | zlbytes | zltail | zllen | entry 1 | zlend |
| | | | | |
value | 10000 | 1010 | 1 | ? | 1111 1111 |
+---------+--------+-------+----------------+-----------+
^ ^
| |
address ZIPLIST_ENTRY_HEAD ZIPLIST_ENTRY_END
&
ZIPLIST_ENTRY_TAIL
(4) 压缩列表源码解析
压缩列表
(4.1) 压缩列表结构
area |<---- ziplist header ---->|<----------- entries ------------->|<-end->|
size 4 bytes 4 bytes 2 bytes ? ? ? ? 1 byte
+---------+--------+-------+--------+--------+--------+--------+-------+
component | zlbytes | zltail | zllen | entry1 | entry2 | ... | entryN | zlend |
+---------+--------+-------+--------+--------+--------+--------+-------+
^ ^ ^
address | | |
ZIPLIST_ENTRY_HEAD | ZIPLIST_ENTRY_END
|
ZIPLIST_ENTRY_TAIL
如果没有特殊指定,所有字段存储都使用小端存储。
参考ziplistNew()
函数
entry节点结构
area |<------------------- entry -------------------->|
+------------------+----------+--------+---------+
component | pre_entry_length | encoding | length | content |
+------------------+----------+--------+---------+
pre_entry_length 记录了前一个节点的长度,通过这个值,可以进行指针计算,从而跳转到上一个节点。
/*
* 我们使用此函数接收有关 ziplist entry的信息。
* 请注意,这并不是数据的实际编码方式,只是我们为了更容易操作而通过函数填充的内容。
*/
typedef struct zlentry {
unsigned int prevrawlensize; // 前一个entry长度 对应的字节数
unsigned int prevrawlen; // 前一个entry长度
unsigned int lensize; // 用于编码此entry类型/长度 的字节数。 // int long 对应的字节数
// 例如,字符串有一个 1、2 或 5 字节的标头。 整数总是使用一个字节。
unsigned int len; // 用于表示实际entry的长度。
// 对于字符串,这只是字符串长度,
// 而对于整数,它是 1、2、3、4、8 或 0(for 4 bit immediate),具体取决于数字范围。
unsigned int headersize; // prevrawlensize + lensize.
unsigned char encoding; // 当前节点数据编码
// 根据entry编码设置为 ZIP_STR_* 或 ZIP_INT_*。
// 然而,对于 4 bits immediate integers,这可以假定一个值范围并且必须进行范围检查。
unsigned char *p; // 指向条目最开始的指针,即 this 指向 previous-entry-in 字段。
} zlentry;
(4.2) 压缩列表提供的功能
函数名 | 作用 | 算法复杂度 |
---|---|---|
ziplistNew | 创建一个新的 ziplist | O(1) |
ziplistResize | 调整 ziplist 内存大小 | O(N) |
ziplistPush | 将一个包含给定值的新节点插入 ziplist 的表头或者表尾 | O(N^2) |
zipEntry | 取出给定地址上的节点信息 | O(1) |
ziplistInsert | 将一个包含给定值的新节点插入到给定地址 | O(N^2) |
ziplistDelete | 删除给定地址上的节点 | O(N^2) |
ziplistDeleteRange | 在给定索引上,连续进行多次删除 | O(N^2) |
ziplistFind | 在 ziplist 中查找并返回包含给定值的节点 | O(N) |
ziplistLen | 返回 ziplist 保存的节点数量 | O(N) |
ziplistBlobLen | 以字节为单位,返回 ziplist 占用的内存大小 | O(1) |
(4.3) 创建压缩列表
// file: ziplist.c
/* 创建一个空的压缩列表 */
unsigned char *ziplistNew(void) {
// 计算长度
// ZIPLIST_HEADER_SIZE = 32bits + 32bits + 16bits
// ZIPLIST_END_SIZE = 8bits
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
// 初始分配的大小
unsigned char *zl = zmalloc(bytes);
// 压缩列表总字节长度
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
// 尾部节点字节距离 结尾下标
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
// 长度设置为0
ZIPLIST_LENGTH(zl) = 0;
// 最后一个字节 设置成 结束符255,标记结束
zl[bytes-1] = ZIP_END;
return zl;
}
ziplistNew
函数的逻辑很简单,就是创建一块连续的内存空间,大小为 ZIPLIST_HEADER_SIZE
(10字节) 和 ZIPLIST_END_SIZE
(1字节) 的总和,然后再把该连续空间的最后一个字节赋值为 ZIP_END,表示列表结束。
//ziplist的列表头大小,包括2个32 bits整数和1个16bits整数,
// 分别表示压缩列表的总字节数,列表最后一个元素的离列表头的偏移,以及列表中的元素个数
/*
* ziplist的列表头大小:2个32 bits整数,用于表示总字节数 和 最后一项偏移量。
* 用1个16 bits的整数表示元素个数
*
*/
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
/*
* ziplist的列表尾大小,只有1字节,表示列表结束。
*/
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
// ziplist的列表尾字节内容
#define ZIP_END 255 /* 特殊的 "ziplist 结尾" 实体。 */
intrev32ifbe
函数为大小端转换函数,统一转换为小端存储。 为什么要进行转换?
因为压缩列表的操作中涉及到的位运算很多,后续的所有位运算都是在小端存储的基础上进行的。
(4.4) 调整压缩列表内存大小
/*
* 调整 ziplist 内存大小
*/
unsigned char *ziplistResize(unsigned char *zl, size_t len) {
// 校验 确保长度< 2^32
assert(len < UINT32_MAX);
// 压缩列表分配内存
zl = zrealloc(zl,len);
// 压缩列表总字节长度
ZIPLIST_BYTES(zl) = intrev32ifbe(len);
// 最后一个字节 设置成 结束符255,标记结束
zl[len-1] = ZIP_END;
return zl;
}
(4.5) 压缩列表新增元素-ziplistPush
1.计算实际插入元素的长度
/*
* 在压缩列表的entry头部或尾部插入元素
*
* @param *zl 压缩列表
* @param *s 新增数据
* @param slen 长度
* @param where
*/
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
unsigned char *p;
// 判断在entry头部或尾部插入元素
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
// 插入元素
return __ziplistInsert(zl,p,s,slen);
}
/*
* 在entry指定位置插入元素
*
* @param *zl 压缩列表
* @param *p 插入下标的指针
* @param *s 要插入的数据
* @param slen 插入数据长度
*/
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
//
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; /* initialized to avoid warning. Using a value
that is easy to see if for some reason
we use it uninitialized. */
zlentry tail;
// 找出插入的entry的 prevlen。
if (p[0] != ZIP_END) { // 插入位置不在尾部
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else { // 插入位置在尾部
// 最后一个entry的指针
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) { // 不是结束标识
// 前一个entry的长度
prevlen = zipRawEntryLength(ptail);
}
}
// 查看条目是否可以编码成整数
if (zipTryEncoding(s,slen,&value,&encoding)) {
// 'encoding' 设置为适当的整数编码
reqlen = zipIntSize(encoding);
} else {
// 'encoding' 未受影响,zipStoreEntryEncoding 将使用字符串长度来确定如何对其进行编码。
reqlen = slen;
}
// 需要前一个entry的长度 和 有效载荷的长度
reqlen += zipStorePrevEntryLength(NULL,prevlen); // 加上前一个entry的长度
reqlen += zipStoreEntryEncoding(NULL,encoding,slen); // 加上当前entry编码长度和当前entry数据长度
// 当插入位置不等于尾部时,我们需要确保下一个entry可以在其 prevlen 字段中保存本entry的长度。
int forcelarge = 0;
// 用来判断插入位置元素的prevlen长度和实际所需的prevlen长度是否相等
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
// 这个是为了修复一个bug ziplist源码确实不太好读呀 😭 详细分析见:https://segmentfault.com/a/1190000018878466
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
// 存储下标,因为重新分配内存可能更改ziplist的地址
offset = p-zl;
// 调整ziplist内存大小 // 当前长度 + 需要申请的长度 + nextdiff
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;
// 必要时应用内存移动并更新尾部偏移量。
if (p[0] != ZIP_END) { // 不是结束字符
// 由于 ZIP_END 字节减一 /* Subtract one because of the ZIP_END bytes */
// 将p节点后移(没有移动p节点前一节点长度信息),留出当前节点位置
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
/* Encode this entry's raw length in the next entry. */
// 写入p节点前一节点长度信息(要插入节点的长度)
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);
// 更新尾节点偏移量
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else { // 是结束字符 也就是列表是空的
// 空列表插入,只更新尾节点偏移量
/* This element will be the new tail. */
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
// 这个类似于数组前插入元素,整体都要移动,这个复杂度 🍵
// 当 nextdiff != 0 时,下一个节点的prevlen需要改变,所以我们需要在整个 ziplist 中级联更新
if (nextdiff != 0) {
offset = p-zl;
// 连锁更新
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
// 写入前一节点长度信息
p += zipStorePrevEntryLength(p,prevlen);
// 写入节点编码与长度信息
p += zipStoreEntryEncoding(p,encoding,slen);
// 写入数据
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
// 增加压缩列表长度
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}
可以看到,在新增节点时首先会尝试编码,如果编码成数字使用整形数字编码,否则使用字符串编码
但是不同编码占用的内存空间不同,整数编码占用 1/2/3/4/8 字节,字符串编码占用 slen 字节
当在一个元素 A 前插入一个新的元素 B 时,A 的 prevlen 和 prevlensize 都要根据 B 的长度进行相应的变化。
假设 A 的 prevlen 原本只占用 1 字节(也就是 prevlensize 等于 1),而能记录的前一项长度最大为 253 字节。此时,如果 B 的长度超过了 253 字节,A 的 prevlen 就需要使用 5 个字节来记录,这样就需要申请额外的 4 字节空间了。
连锁更新一旦发生,就会导致 ziplist 占用的内存空间要多次重新分配,这就会直接影响到 ziplist 的访问性能。
(4.5.1) 字符串尝试编码为整数-zipTryEncoding
/*
* 检查"entry"指向的字符串是否可以编码为整数。
* 将整数值存储在“v”中,并将其编码存储在“encoding”中。
*
* @param *entry 字符串
* @param entrylen 节点长度
* @param *v 整数值
* @param 编码
*/
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
// 字符串转换为整数后的值
long long value;
if (entrylen >= 32 || entrylen == 0) return 0;
// 字符转整形数字(8字节) 转换后的值存储在&value
if (string2ll((char*)entry,entrylen,&value)) {
// 可以对字符串进行编码。 检查可以容纳此值的最小编码类型。
if (value >= 0 && value <= 12) {
*encoding = ZIP_INT_IMM_MIN+value;
} else if (value >= INT8_MIN && value <= INT8_MAX) {
*encoding = ZIP_INT_8B;
} else if (value >= INT16_MIN && value <= INT16_MAX) {
*encoding = ZIP_INT_16B;
} else if (value >= INT24_MIN && value <= INT24_MAX) {
*encoding = ZIP_INT_24B;
} else if (value >= INT32_MIN && value <= INT32_MAX) {
*encoding = ZIP_INT_32B;
} else {
*encoding = ZIP_INT_64B;
}
// 设置值
*v = value;
return 1;
}
// 返回false
return 0;
}
判断字符串是否可以编码为整数
如果可以编码为整数,编码根据大小设置为 ZIP_INT_8B / ZIP_INT_16B / ZIP_INT_24B / ZIP_INT_32B / ZIP_INT_64B,分表占用 1/2/3/4/8 字节内存空间。
(4.5.2) 压缩整数占用内存空间大小-zipIntSize
/*
* 返回存储由'encoding'编码的整数所需的字节数。
*/
unsigned int zipIntSize(unsigned char encoding) {
switch(encoding) {
case ZIP_INT_8B: return 1;
case ZIP_INT_16B: return 2;
case ZIP_INT_24B: return 3;
case ZIP_INT_32B: return 4;
case ZIP_INT_64B: return 8;
}
//
if (encoding >= ZIP_INT_IMM_MIN && encoding <= ZIP_INT_IMM_MAX)
return 0; /* 4 bit immediate */
panic("Invalid integer encoding 0x%02X", encoding);
return 0;
}
ZIP_INT_8B / ZIP_INT_16B / ZIP_INT_24B / ZIP_INT_32B / ZIP_INT_64B,分表占用 1/2/3/4/8 字节内存空间
#define ZIP_INT_16B (0xc0 | 0<<4)
#define ZIP_INT_32B (0xc0 | 1<<4)
#define ZIP_INT_64B (0xc0 | 2<<4)
#define ZIP_INT_24B (0xc0 | 3<<4)
#define ZIP_INT_8B 0xfe
#define ZIP_INT_IMM_MIN 0xf1 /* 11110001 */
#define ZIP_INT_IMM_MAX 0xfd /* 11111101 */
(4.5.3) 前一个节点长度占用内存大小-zipStorePrevEntryLength
/*
* 编码前一个entry的长度并将其写入指针"p"。
* 如果"p"为 NULL,则返回编码此长度所需的字节数。
*
* @param *p
* @param len 长度
*/
unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) {
if (p == NULL) {
// ZIP_BIG_PREVLEN = 254
// len < 254 返回 1字节,否则返回 int占用4字节+1字节=5字节
return (len < ZIP_BIG_PREVLEN) ? 1 : sizeof(len)+1;
} else {
if (len < ZIP_BIG_PREVLEN) { // len < 254
p[0] = len;
// 返回1字节
return 1;
} else {
// 返回对应的大小
return zipStorePrevEntryLengthLarge(p,len);
}
}
}
/*
* 编码前一个entry的长度并将其写入指针"p"。
* 这仅使用较大的编码(__ziplistCascadeUpdate 中需要)。
*/
int zipStorePrevEntryLengthLarge(unsigned char *p, unsigned int len) {
if (p != NULL) {
// 第一个字节设置成 254
p[0] = ZIP_BIG_PREVLEN;
// len是int类型,占用4个字节
// 接下来4个字节保存长度len
memcpy(p+1,&len,sizeof(len));
//
memrev32ifbe(p+1);
}
// 返回 1字节+int占用4字节=5字节
return 1+sizeof(len);
}
根据编码方式的不同, pre_entry_length 域可能占用 1 字节或者 5 字节:
1 字节:如果前一节点的长度小于 254 字节,便使用一个字节保存它的值。
5 字节:如果前一节点的长度大于等于 254 字节,那么将第 1 个字节的值设为 254 ,然后用接下来的4个字节保存实际长度。
253对应的entry
area |<------------------- entry -------------------->|
size 1 byte ? ? ?
+------------------+----------+--------+---------+
component | pre_entry_length | encoding | length | content |
| | | | |
value | 1111 1101 | | | |
+------------------+----------+--------+---------+
1024对应的entry
area |<------------------------------ entry ---------------------------------->|
size 5 bytes ? ? ?
+-------------------------------------------+----------+--------+---------+
component | pre_entry_length | encoding | length | content |
| | | | |
| 11111110 00000000000000000000010000000000 | ? | ? | ? |
+-------------------------------------------+----------+--------+---------+
|<------->|<------------------------------->|
1 byte 4 bytes
(4.5.4) 节点编码占用的内存大小-zipStoreEntryEncoding
/*
* 在指针'p'中写入entry的编码头。
* 如果 p 为 NULL,它只返回编码该长度所需的字节数。
* 参数:
* 'encoding' 用于entry的编码。 对于单字节小立即整数,它可以是 ZIP_INT_* 或 ZIP_STR_* 或介于 ZIP_INT_IMM_MIN 和 ZIP_INT_IMM_MAX 之间。
* 'rawlen' 仅用于 ZIP_STR_* 编码并且是该条目表示的字符串的长度。
*
* 该函数返回存储在指针"p"中的编码/长度标头使用的字节数。
*/
unsigned int zipStoreEntryEncoding(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
// 长度
unsigned char len = 1, buf[5];
if (ZIP_IS_STR(encoding)) { // 字符串编码
// 虽然给定了编码,但它可能没有为字符串设置,所以我们在这里使用原始长度来确定它。
if (rawlen <= 0x3f) { // 字符串长度小于等于63字节(16进制为0x3f)
if (!p) return len;
buf[0] = ZIP_STR_06B | rawlen;
} else if (rawlen <= 0x3fff) { // 字符串长度小于等于16383字节(16进制为0x3fff)
len += 1; // 编码结果是2字节
if (!p) return len;
buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
buf[1] = rawlen & 0xff;
} else { // 字符串长度大于16383字节
len += 4; // 编码结果是5字节
if (!p) return len;
buf[0] = ZIP_STR_32B;
buf[1] = (rawlen >> 24) & 0xff;
buf[2] = (rawlen >> 16) & 0xff;
buf[3] = (rawlen >> 8) & 0xff;
buf[4] = rawlen & 0xff;
}
} else {
// 表示整数编码,因此长度始终为 1。
if (!p) return len;
buf[0] = encoding;
}
// 把长度存储到指针p
memcpy(p,buf,len);
return len;
}
根据字符串不同的长度,分别占用 1/2/5 字节内存空间。
针对不同长度的数据,使用不同大小的元数据信息(encoding),这种方法可以有效地节省内存开销。
(4.5.5) 前一个节点长度内存空间之差-zipPrevLenByteDiff
/*
* 给定一个指针"p"指向 entry 前缀的 prevlen 信息,
* 如果前一个entry的大小发生变化,此函数将返回编码 prevlen 所需的字节数差异。
*
* 如果 A 是现在用于对"prevlen"字段进行编码的字节数。
* 如果前一个元素将更新为大小为"len",则 B 是编码"prevlen"所需的字节数。
* 函数返回 B - A
*
* 因此,如果需要更多空间,该函数返回一个正数,如果需要更少空间,则返回负数,如果需要相同空间,则返回零。
*
* @param *p
* @param len 占用的内存空间大小/长度
*/
int zipPrevLenByteDiff(unsigned char *p, unsigned int len) {
unsigned int prevlensize;
// 前一个节点长度
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
// B - A
return zipStorePrevEntryLength(NULL, len) - prevlensize;
}
(4.5.6) 前一节点长度占用的内存空间-ZIP_DECODE_PREVLENSIZE
/*
* 返回用于对前一个entry的长度进行编码的字节数。
* 通过设置 'prevlensize' 返回长度。
*/
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do { \
if ((ptr)[0] < ZIP_BIG_PREVLEN) { \
(prevlensize) = 1; \
} else { \
(prevlensize) = 5; \
} \
} while(0)
前一节点长度小于254时,使用1个字节保存前一节点的长度信息。
前一节点长度大于254时,使用5个字节保存前一节点的长度信息。
(4.6) 取指定节点信息
/*
* 返回包含有关entry结构的所有信息。
*/
void zipEntry(unsigned char *p, zlentry *e) {
// 前一哥节点长度信息解析
ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
// 当前节点 数据长度解析 与 编码信息解析
ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
// 设置header长度
e->headersize = e->prevrawlensize + e->lensize;
// 指针指向节点e
e->p = p;
}
(4.7) 插入数据
/*
* 在entry指定位置插入元素
*
* @param *zl 压缩列表
* @param *p 插入下标的指针
* @param *s 要插入的数据
* @param slen 插入数据长度
*/
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
return __ziplistInsert(zl,p,s,slen);
}
(4.8) 删除数据
/* Delete a single entry from the ziplist, pointed to by *p.
* Also update *p in place, to be able to iterate over the
* ziplist, while deleting entries. */
unsigned char *ziplistDelete(unsigned char *zl, unsigned char **p) {
size_t offset = *p-zl;
//
zl = __ziplistDelete(zl,*p,1);
/* Store pointer to current element in p, because ziplistDelete will
* do a realloc which might result in a different "zl"-pointer.
* When the delete direction is back to front, we might delete the last
* entry and end up with "p" pointing to ZIP_END, so check this. */
*p = zl+offset;
return zl;
}
/* Delete "num" entries, starting at "p". Returns pointer to the ziplist. */
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
unsigned int i, totlen, deleted = 0;
size_t offset;
int nextdiff = 0;
zlentry first, tail;
zipEntry(p, &first);
for (i = 0; p[0] != ZIP_END && i < num; i++) {
p += zipRawEntryLength(p);
deleted++;
}
totlen = p-first.p; /* Bytes taken by the element(s) to delete. */
if (totlen > 0) {
if (p[0] != ZIP_END) {
/* Storing `prevrawlen` in this entry may increase or decrease the
* number of bytes required compare to the current `prevrawlen`.
* There always is room to store this, because it was previously
* stored by an entry that is now being deleted. */
nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
/* Note that there is always space when p jumps backward: if
* the new previous entry is large, one of the deleted elements
* had a 5 bytes prevlen header, so there is for sure at least
* 5 bytes free and we need just 4. */
p -= nextdiff;
zipStorePrevEntryLength(p,first.prevrawlen);
/* Update offset for tail */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
zipEntry(p, &tail);
if (p[tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
/* Move tail to the front of the ziplist */
memmove(first.p,p,
intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
} else {
/* The entire tail was deleted. No need to move memory. */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe((first.p-zl)-first.prevrawlen);
}
/* Resize and update length */
offset = first.p-zl;
zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
ZIPLIST_INCR_LENGTH(zl,-deleted);
p = zl+offset;
/* When nextdiff != 0, the raw length of the next entry has changed, so
* we need to cascade the update throughout the ziplist */
if (nextdiff != 0)
zl = __ziplistCascadeUpdate(zl,p);
}
return zl;
}
(4.9) 查找-ziplistFind
要查找列表中间的元素时,ziplist 就得从列表头或列表尾遍历才行。
/*
* 查找指向等于指定节点的节点的指针。
* 在每次比较之间跳过'skip'节点。
* 找不到字段时返回 NULL。
*
* @param *p 节点指针
* @param *vstr
* @param vlen
* @param skip
*/
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
int skipcnt = 0;
unsigned char vencoding = 0;
long long vll = 0;
// 遍历压缩列表
while (p[0] != ZIP_END) { // 不等于结束符
unsigned int prevlensize, encoding, lensize, len;
unsigned char *q;
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
q = p + prevlensize + lensize;
if (skipcnt == 0) {
/* Compare current entry with specified entry */
if (ZIP_IS_STR(encoding)) { // 如果编码为字符串
// 判断长度相同 并且
if (len == vlen && memcmp(q, vstr, vlen) == 0) {
return p;
}
} else {
/* Find out if the searched field can be encoded. Note that
* we do it only the first time, once done vencoding is set
* to non-zero and vll is set to the integer value. */
if (vencoding == 0) {
// 首次比对时,对传入值进行解码
if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
/* If the entry can't be encoded we set it to
* UCHAR_MAX so that we don't retry again the next
* time. */
vencoding = UCHAR_MAX;
}
/* Must be non-zero by now */
assert(vencoding);
}
/* Compare current entry with specified entry, do it only
* if vencoding != UCHAR_MAX because if there is no encoding
* possible for the field it can't be a valid integer. */
if (vencoding != UCHAR_MAX) {
long long ll = zipLoadInteger(q, encoding);
if (ll == vll) {
return p;
}
}
}
/* Reset skip count */
skipcnt = skip;
} else {
// 跳过entry
skipcnt--;
}
// 指向下一个entry
p = q + len;
}
return NULL;
}
当我们往 ziplist 中插入数据时,ziplist 就会根据数据是字符串还是整数,以及它们的大小进行不同的编码。
ziplist 是如何进行编码呢?
ziplist 列表项包括三部分内容,分别是前一项的长度(prevlen)、当前项长度信息的编码结果(encoding),以及当前项的实际数据(data)。
下面的图展示了列表项的结构
(5) 总结
1、ziplist 设计的初衷就是「节省内存」,在存储数据时,把内存利用率发挥到了极致:
- 数字按「整型」编码存储,比直接当字符串存内存占用少
- 数据「长度」字段,会根据内容的大小选择最小的长度编码
- 甚至对于极小的数据,干脆把内容直接放到了「长度」字段中(前几个位表示长度,后几个位存数据)
2、但 ziplist 的劣势也很明显:
- 寻找元素只能挨个遍历,存储过长数据,查询性能很低
- 每个元素中保存了「上一个」元素的长度(为了方便反向遍历),这会导致上一个元素内容发生修改,长度超过了原来的编码长度,下一个元素的内容也要跟着变,重新分配内存,进而就有可能再次引起下一级的变化,一级级更新下去,频繁申请内存
3、想要缓解 ziplist 的问题,比较简单直接的方案就是,多个数据项,不再用一个 ziplist 来存,而是分拆到多个 ziplist 中,每个 ziplist 用指针串起来,这样修改其中一个数据项,即便发生级联更新,也只会影响这一个 ziplist,其它 ziplist 不受影响,这种方案就是 quicklist:
qucklist: ziplist1(也叫quicklistNode) <-> ziplist2 <-> ziplist3 <-> …
4、List 数据类型底层实现,就是用的 quicklist,因为它是一个链表,所以 LPUSH/LPOP/RPUSH/RPOP 的复杂度是 O(1)
5、List 中每个 ziplist 节点可以存的元素个数/总大小,可以通过 list-max-ziplist-size 配置:
- 正数:ziplist 最多包含几个数据项
- 负数:取值 -1 ~ -5,表示每个 ziplist 存储最大的字节数,默认 -2,每个ziplist 8KB
ziplist 超过上述任一配置,添加新元素就会新建 ziplist 插入到链表中。
6、List 因为更多是两头操作,为了节省内存,还可以把中间的 ziplist「压缩」,具体可看 list-compress-depth 配置项,默认配置不压缩
7、要想彻底解决 ziplist 级联更新问题,本质上要修改 ziplist 的存储结构,也就是不要让每个元素保存「上一个」元素的长度即可,所以才有了 listpack
8、listpack 每个元素项不再保存上一个元素的长度,而是优化元素内字段的顺序,来保证既可以从前也可以向后遍历
9、listpack 是为了替代 ziplist 为设计的,但因为 List/Hash/ZSet 都严重依赖 ziplist,所以这个替换之路很漫长,目前只有 Stream 数据类型用到了 listpack
参考资料
[1] Redis设计与实现-压缩列表
[2] Redis源码剖析与实战 - 04 内存友好的数据结构该如何细化设计?
[3] Redis源码剖析与实战 - 06 | 从ziplist到quicklist,再到listpack的启发
[4] redis-ziplist源码-github