Redis源码剖析--集合t_set

今天来看看Redis的另一个数据类型—集合set。在RedisObject一篇中,有介绍到集合对象的底层有两种编码形式,分别是OBJ_ENCODING_INTSET(底层数据结构为整数集合)和OBJ_ENCODING_HT(底层数据结构为字典),如果对整数集合Intset字典dict不熟悉的,可以点击跳转去复习一下。下面,就一起去剖析一下set的实现源码吧。

Set概述

关于集合set的源码在server.h和t_set.c文件中,与前面分析字符串和列表一样,先来看看它的一些定义。Redis采用RedisObject数据结构来表示它的所有对外开放的数据类型。

1
2
3
4
5
6
7
typedef struct redisObject {
unsigned type:4; // 此字段为OBJ_SET
unsigned encoding:4; // 如果是set结构,编码为OBJ_ENCODING_INTSET或OBJ_ENCODING_HT
unsigned lru:LRU_BITS;
int refcount;
void *ptr;
} robj;

在RedisObject结构中,如果encoding字段为OBJ_ENCODING_INTSET或OBJ_ENCODING_HT时,就说明这个对象表示一个set对象。那么,ptr字段就指向一个整数集合或者字典结构。

Set迭代器

每个数据类型都拥有其迭代器结构,用来遍历集合中的每一个元素,set的迭代器结构定义如下:

1
2
3
4
5
6
typedef struct {
robj *subject; // 指向的set对象
int encoding; // 编码类型
int ii; // 如果是intset,则用下标即可代表迭代器
dictIterator *di; // 如果是ht编码,则采用字典的迭代器
} setTypeIterator;

有了迭代器结构之后,Redis为它提供了一些操作函数,用于迭代操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 初始化迭代器
setTypeIterator *setTypeInitIterator(robj *subject) {
setTypeIterator *si = zmalloc(sizeof(setTypeIterator));
si->subject = subject;
si->encoding = subject->encoding;
if (si->encoding == OBJ_ENCODING_HT) {
si->di = dictGetIterator(subject->ptr); // 获取字典迭代器
} else if (si->encoding == OBJ_ENCODING_INTSET) {
si->ii = 0; // 整数集合的下标
} else {
serverPanic("Unknown set encoding");
}
return si;
}
// 释放迭代器
void setTypeReleaseIterator(setTypeIterator *si) {
if (si->encoding == OBJ_ENCODING_HT)
dictReleaseIterator(si->di);
zfree(si);
}
// 指向下一个迭代器,返回迭代器的编码类型
// 另外objele传出迭代器指向的字典节点对象
// llele传出迭代器指向整数值
int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {
if (si->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictNext(si->di);
if (de == NULL) return -1;
*objele = dictGetKey(de);
*llele = -123456789; /* Not needed. Defensive. */
} else if (si->encoding == OBJ_ENCODING_INTSET) {
if (!intsetGet(si->subject->ptr,si->ii++,llele))
return -1;
*objele = NULL; /* Not needed. Defensive. */
} else {
serverPanic("Wrong set encoding in setTypeNext");
}
return si->encoding;
}

编码转换

我们知道,当一个集合满足只保存整数元素且元素数量不多时,就采用整数集合来存放。所以,当往集合set中添加的元素不是整数类型或者数据量较多时,就需要将内部编码转换为字典结构。Redis的配置文件中提供了set-max-intset-entries参数用来表示整数集合中最大能存放的整数数量。

1
set-max-intset-entries 512 // 默认整数集合intset的最多能存放512个整数

编码转换的功能由setTypeConvert函数实现,其利用set类型迭代器遍历intset中的每一个元素,然后添加到字典dict结构中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 将set类型的intset编码转换成ht编码
void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si;
// 判断当前编码类型是否为intset
serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&
setobj->encoding == OBJ_ENCODING_INTSET);
// 判断enc提供的编码类型是不是HT
if (enc == OBJ_ENCODING_HT) {
int64_t intele;
dict *d = dictCreate(&setDictType,NULL);
robj *element;
// 扩张字典结构的大小,避免出现rehash过程
dictExpand(d,intsetLen(setobj->ptr));
// 初始化迭代器,遍历intset,然后依次加入到dict中
si = setTypeInitIterator(setobj);
while (setTypeNext(si,&element,&intele) != -1) {
// 将整数类型转换成字符串对象
element = createStringObjectFromLongLong(intele);
serverAssertWithInfo(NULL,element,
dictAdd(d,element,NULL) == DICT_OK);
}
// 释放迭代器
setTypeReleaseIterator(si);
// 设定编码类型
setobj->encoding = OBJ_ENCODING_HT;
zfree(setobj->ptr);
setobj->ptr = d;
} else {
serverPanic("Unsupported set conversion");
}
}

Set基本接口

set作为Redis的一个数据结构,必然会提供丰富的接口函数,其实现相对简单,和string,list一样,都是调用底层数据结构的接口来实现。下面罗列出所有的接口函数。

1
2
3
4
5
6
7
8
9
10
11
12
// 创建set类型的Redis对象
robj *setTypeCreate(robj *value);
// 往set集合中添加数据
int setTypeAdd(robj *subject, robj *value);
// 移除set集合中的数据
int setTypeRemove(robj *setobj, robj *value);
// 判断指定的value是否存在于set集合中
int setTypeIsMember(robj *subject, robj *value);
// 随机返回set集合中的某个元素
int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele);
// 获取set集合的大小
unsigned long setTypeSize(robj *subject);

以setTypeAdd为例,来分析一下它的源码实现(其他接口函数的源码可以去t_set查看,这里就不再赘述,实现都比较简单)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 添加数据到集合set中
// 如果底层编码是intset需要考虑元素个数是否超过了set_max_intset_entries
int setTypeAdd(robj *subject, robj *value) {
long long llval;
// 如果底层编码是HT,则直接调用字典的添加元素函数
if (subject->encoding == OBJ_ENCODING_HT) {
if (dictAdd(subject->ptr,value,NULL) == DICT_OK) {
incrRefCount(value);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
// 编码类型是iNTSET,则需要判断待添加元素的类型
if (isObjectRepresentableAsLongLong(value,&llval) == C_OK) {
// 待添加元素是整数类型
uint8_t success = 0;
// 调用intset的添加元素函数
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
// 判断intset的长度是否超过了set_max_intset_entries
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
// 将底层编码转换成HT
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
// 执行到此,说明待添加元素不为整数
// 此时就需要将底层编码转换成HT,因为intset只能存放整数类型
setTypeConvert(subject,OBJ_ENCODING_HT);
// 一样,调用字典的添加函数
serverAssertWithInfo(NULL,value,
dictAdd(subject->ptr,value,NULL) == DICT_OK);
incrRefCount(value);
return 1;
}
} else {
serverPanic("Unknown set encoding");
}
return 0;
}

Set命令

当我们启动一个Redis客户端的时候,如果输入如下指令:

1
127.0.0.1:6379> SADD numbers 1 3 5 // 往集合键中添加元素

那么,此时的编码类型我们也可以查看到:

1
2
127.0.0.1:6379> OBJECT ENCODING numbers
"intset"

知道SADD指令怎么使用之后,就要深入到源码中来了解它的具体实现。SADD命令的实现由saddCommand函数实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 往集合键中添加元素,如果该元素存在,则不做任何处理;反之则添加
void saddCommand(client *c) {
robj *set;
int j, added = 0;
// 查看数据库中是否存在该集合键
set = lookupKeyWrite(c->db,c->argv[1]);
if (set == NULL) {
// 不存在就创建一个
set = setTypeCreate(c->argv[2]);
dbAdd(c->db,c->argv[1],set);
} else {
// 存在就检查它是否是一个集合类型的对象
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}
// 从第三个参数开始,为待添加的函数
for (j = 2; j < c->argc; j++) {
// 试图将待添加的元素编码成字符串类型,以节省内存
c->argv[j] = tryObjectEncoding(c->argv[j]);
// 调用集合set的添加函数
if (setTypeAdd(set,c->argv[j])) added++;
}
// 如果至少添加成功了一个元素
if (added) {
// 发送通知
signalModifiedKey(c->db,c->argv[1]);
// 发送事件通知
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}

添加元素的底层实现是不是很简单!是不是!基本上就是一些逻辑判断,调用底层函数等。那么,Redis到底为set提供了多少操作命令呢?我们用一张表格来汇总一下。

命令样式 命令描述
SADD key number1 [number2…] 向集合键中添加一个或多个元素
SCARD key 返回集合键中的元素个数
SMEMBERS key 返回集合中的所有成员
SISMEMBER key member 判断元素是否是集合成员
SPOP key 随机返回并移除一个元素
SRANDMEMBER key [count] 随机返回一个或多个元素
SREM key member [member …] 移除指定的元素
SMOVE source destination member 将元素从集合移至另一个集合
SDIFF key [key …] 将一或多个集合的差集保存至另一集合
SINTER key [key …] 将一或多个集合的交集保存至另一集合
SINTERSTORE destination key [key …] 将一或多个集合的交集存储到新集合
SUNION key [key …] 返回集合的并集
SUNIONSTORE destination key [key …] 将集合的并集插入新集合

这里,我挑两个集合运算的命令进行剖析一下,SDIFF和SUNION,其他的函数都相对比较简单。

集合运算

集合运算包括交集和并集运算,在客户端输入下面的指令会得到一个或多个集合的交集或并集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 设定集合key1的元素
127.0.0.1:6379> SADD key1 1 2 3 4 5
(integer) 5
// 设定集合key2的元素
127.0.0.1:6379> SADD key2 3 4 5 6 7
(integer) 5
// 求集合key1和key2的差集
127.0.0.1:6379> SDIFF key1 key2
1) "1"
2) "2"
// 求集合key1和key2的并集
127.0.0.1:6379> SUNION key1 key2
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "7"

这两个命令分别由底层函数sdiffCommand和sunionCommand实现,源码如下:

1
2
3
4
5
6
7
8
// 并集运算
void sunionCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION);
}
// 差集运算
void sdiffCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF);
}

从源码中我们可以看到,并集和差集运算都调用了同一个底层函数,也就是并集差集的泛型运算函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#define SET_OP_UNION 0 // 并集运算
#define SET_OP_DIFF 1 // 差集运算
// 并集差集运算的泛型实现
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *ele, *dstset = NULL;
int j, cardinality = 0;
int diff_algo = 1;
// 取出所有集合对象,并添加到集合数组中
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
// 不存在的集合当做NULL处理
if (!setobj) {
sets[j] = NULL;
continue;
}
// 检查取出的对象的类型,有对象不是集合则停止执行,并清理集合数组
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
// 记录对象
sets[j] = setobj;
}
// Redis提供了两个算法来进行差集运算
// 算法1的复杂度为O(N*M),N表示第一个集合中的元素个数,M表示集合数组中的集合个数
// 算法2的复杂度为O(N),N表示所有集合中的元素个数总和
// 此处需要计算输入来考察选用何种算法比较好
if (op == SET_OP_DIFF && sets[0]) {
long long algo_one_work = 0, algo_two_work = 0;
// 遍历所有集合
for (j = 0; j < setnum; j++) {
if (sets[j] == NULL) continue;
// 计算N*M,也就是setNum*(set[0]中集合的个数)
algo_one_work += setTypeSize(sets[0]);
// 计算所有集合的元素总个数
algo_two_work += setTypeSize(sets[j]);
}
// 算法1的常数比较低,优先选用算法1
algo_one_work /= 2;
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
if (diff_algo == 1 && setnum > 1) {
// 如果选用算法1的话,对除set[0]以外的集合进行排序,这样有助于优化算法的性能
// 如下是比较函数,其实就是比较每个集合的大小来进行排序
/* int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
* robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;
*
* return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0);
* }
*/
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}
// 使用一个临时集合来保存结果集
dstset = createIntsetObject();
if (op == SET_OP_UNION) {
// 执行到此,说明执行的是并集计算
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; // 空集的话直接跳过
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
// 直接把元素加入到结果集,如果元素不存在就添加,返回1,反之不作处理并返回0
if (setTypeAdd(dstset,ele)) cardinality++;
// 引用计数减1,此处会销毁ele
decrRefCount(ele);
}
setTypeReleaseIterator(si);
}
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
// 执行到此说明采用算法1来求差集
// 程序遍历集合set[0]中的所有元素
// 并将这个元素和其他集合中的所有元素进行对比
// 只有这个元素不存在于其他集合时,才会将它添加到结果集中
si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; // 空集合直接跳过
if (sets[j] == sets[0]) break; // 相同的集合直接跳过
if (setTypeIsMember(sets[j],ele)) break; // 查看ele是否存在于此集合
}
if (j == setnum) {
// 此元素不存在其他任何集合中,添加到结果集
setTypeAdd(dstset,ele);
cardinality++;
}
decrRefCount(ele);
}
setTypeReleaseIterator(si);
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
// 算法2,遍历所有的集合
// 如果是集合0,则添加到结果集合
// 如果非集合0,则将相同的元素从结果集合中移除
// 算法复杂度为O(N),N为所有集合元素的总个数
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; // 跳过空集
si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
// 将set[0]添加到结果集
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
// 将相同的元素从结果集中移除
if (setTypeRemove(dstset,ele)) cardinality--;
}
decrRefCount(ele);
}
setTypeReleaseIterator(si);
// 如果结果集是空的话就退出,因为set[0]中的元素都出现到其他集合中
// 后面的运算也没意义了
if (cardinality == 0) break;
}
}
/* Output the content of the resulting set, if not in STORE mode */
// 输出结果集合
if (!dstkey) {
// 执行到此,说明执行的是SDIFF或SUNION
addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
// 遍历并返回结果集合dstset中所有的元素
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulk(c,ele);
decrRefCount(ele);
}
setTypeReleaseIterator(si);
decrRefCount(dstset);
} else {
// 执行到此,说明执行的是SDIFFSTORE或SUNIONSTORE
int deleted = dbDelete(c->db,dstkey); // 删除数据库中的dstkey
if (setTypeSize(dstset) > 0) {
// 如果结果集合不为空,则关联到数据库
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,
op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
} else {
// 如果结果集合为空,删除
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
// 发送事件通知
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
// 发送消息
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);
}

上述代码比较长,但是认真看的话就很好懂了,算法的主要思路都写出来了,如有不理解可以在下方留言。

Set小结

本篇博客分析了Set集合的接口函数,迭代器实现以及命令的源码实现,并在最后对交集并集运算的源码进行了分析。目前,我们对Redis的命令执行源码已经有很深的理解了,但是源码中出现的关于通知,网络,数据库这一块,还是一片模糊,只能靠函数名来稍微理解是什么意思,没事,不急,现在我们先知其然,后面必然会知其所以然。还是一样,大家如果有什么疑惑或者建议,可以在留言区留言,一起学习和讨论Redis!

文章作者: Zeech
文章链接: https://zcheng.ren/2016/12/21/TheAnnotatedRedisSourcet-set/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 「 Zeech 」