阅读 114

Redis源码解析之跳跃表(三)

Redis源码解析之跳跃表(三)

我们再来学习如何从跳跃表中查询数据,跳跃表本质上是一个链表,但它允许我们像数组一样定位某个索引区间内的节点,并且与数组不同的是,跳跃表允许我们将头节点L0层的前驱节点(即跳跃表分值最小的节点)zsl->header.level[0].forward当成索引0的节点,尾节点zsl->tail(跳跃表分值最大的节点)当成索引zsl->length-1的节点,索引按分值从小到大递增查询;也允许我们将尾节点当成索引0的节点,头节点L0层的前驱节点当做索引zsl->length-1的节点,索引按分值从大到小递增查询。当我们调用下面的方法按照索引区间来查询时,会把我们的索引转换成跨度,然后查找落在跨度的第一个节点,之后根据reverse(逆向状态)决定是要正向查询还是逆向查询。

假设我们要进行正向查询(即:索引按分值从小到大递增查询),给定的索引区间是[0,2],那么我们要找到跨度为1的节点,然后从跨度为1的节点L0层逐个递进,直到停留在跨度为3的节点,头节点L0层的前驱节点zsl->header.level[0].forward在跳跃表的索引为0,跨度为1,在跨度为1的节点从L0层逐个递进,一直递进到跨度为3的节点,这样便完成了索引区间[0,2]的查询。如果我们要进行逆向查询(即:索引按分值从大到小递增查询),索引区间依旧是[0,2],那么我们要找到跨度为跨度为zsl->length-0=zsl->length的节点,那自然是尾节点,找到跨度区间的第一个节点后,我们通过backward指针逐个后退,一直后退到跨度为zsl->length-2的节点,如此便完成查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
void zrangeGenericCommand(client *c, int reverse) {
    robj *key = c->argv[1];
    robj *zobj;
    int withscores = 0;
    long start;
    long end;
    long llen;
    long rangelen;
 
    //读取起始索引和终止索引,如果存在一个索引读取失败,则退出
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK))
        return;
    //判断是否要返回分值
    if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr, "withscores")) {
        withscores = 1;
    else if (c->argc >= 5) {
        addReply(c, shared.syntaxerr);
        return;
    }
    //判断key是否存在,如果不存在则退出,如果存在但类型不为ZSET也退出。
    if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL
        || checkType(c, zobj, OBJ_ZSET))
        return;
 
    /*
     * Sanitize indexes.
     * 审查索引,这里主要针对传入索引为负数的情况,大家都知道,如果一个
     * 跳跃表的节点个数为N,我们要从起始节点查询到末尾节点,可以用[0,N-1]
     * 或者[0,-1],当传入的end<0时,这里会重新规正end的索引,llen为zset的长度,
     * 因此查询[0,-1],这里会规正为[0,N-1]。同理,start也会被规正,如果我们查询
     * [-5,-3],即代表查询有序集合倒数第5个节点至倒数第三个节点,前提是N>=5这个
     * 查询才有意义。如果我们的起始索引传入的是一个绝对值>N的负数,那么llen + start的
     * 结果也为负数,如果判断start<0,则start会被规正为0。
     * */
    llen = zsetLength(zobj);
    if (start < 0) start = llen + start;
    if (end < 0) end = llen + end;
    if (start < 0) start = 0;
 
    /*
     * Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length.
     * 如果起始索引大于终止索引,或者起始索引大于等于有序集合节点数量,则直接
     * 返回空数组。
     * */
    if (start > end || start >= llen) {
        addReply(c, shared.emptyarray);
        return;
    }
    //如果判断终止索引大于等于节点数,则规整为llen-1
    if (end >= llen) end = llen - 1;
    //计算要返回的节点数
    rangelen = (end - start) + 1;
    //……
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        //压缩列表逻辑……
    else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
        sds ele;
 
        /*
         * Check if starting point is trivial, before doing log(N) lookup.
         * 这里会根据给定的开始索引,查找该索引对应的节点,并将ln指向该节点。
         * 需要注意的一点是:平常我们都认为header.level[0].forward指向的节点,在跳跃表
         * 中的索引为0,但有时候跳跃表的末尾节点zsl->tail的索引值也有可能为0,这里就要提到
         * 逆向查询。
         * 当我们使用ZRANGE key min max [WITHSCORES]命令查询时,成员的位置是按照其分值
         * 从小到大来排序,这时候header.level[0].forward的索引值为0,
         * header.level[0].forward.level[0].forward的索引值为1。而zls->tail的索引值
         * 为zls->length-1。
         * 当我们使用ZREVRANGE key start stop [WITHSCORES]命令查询时,成员的位置是按照
         * 其分值从大到小来排序,这时候zls->tail的索引值为0,header.level[0].forward的
         * 索引值为zls->length-1,header.level[0].forward.level[0].forward的索引值为
         * zls->length-2。当reverse为1时,本次查询即为逆向查询。
         * 我们注意到不管是if还是else分值,只要start>0,最终都会执行zslGetElementByRank()
         * 将ln定位到起始节点。当start为0时,如果是逆向查询,则索引0的位置是尾节点zsl->tail,
         * 如果是正向查询,索引0的位置则是zsl->header->level[0].forward。
         * 那么(llen - start)和(start + 1)又代表什么含义呢?为什么zslGetElementByRank()
         * 可以根据这两个公式的计算结果,定位到索引对应的节点呢?其实这两个公式计算的是跨度,而
         * zslGetElementByRank()则是根据给定的跳跃表和跨度查找节点而已。
         * 如果是正常查询,假设起始索引为0,则跨度为start(0)+1=1,刚好为头节点L0层到达第一个节点的
         * 跨度为1;如果起始索引为1,则跨度为start(1)+1=2,刚好是头节点到达索引值为1的节点的跨度。
         * 如果是逆向查询,索引值为0代表尾节点,而llen-start(0)=llen为头节点到达尾节点的跨度;同理,
         * 倒数第二个节点的索引值为1,头节点到达倒数第二个节点的跨度为llen-start(1)=llen-1。
         * */
        if (reverse) {
            ln = zsl->tail;
            if (start > 0)
                ln = zslGetElementByRank(zsl, llen - start);
        else {
            ln = zsl->header->level[0].forward;
            if (start > 0)
                ln = zslGetElementByRank(zsl, start + 1);
        }
        //定位到起始节点后,根据逆向状态,不为0时后退查询(ln-backward),为0时递进查询(ln->level[0].forward)
        while (rangelen--) {
            serverAssertWithInfo(c, zobj, ln != NULL);
            ele = ln->ele;
            if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
            addReplyBulkCBuffer(c, ele, sdslen(ele));
            if (withscores) addReplyDouble(c, ln->score);
            ln = reverse ? ln->backward : ln->level[0].forward;
        }
    else {
        serverPanic("Unknown sorted set encoding");
    }
}

  

查找到跨度对应的节点,查找到跨度对应的节点,则在<1>处返回,如果我们传入的跨度大于头节点到尾节点的跨度,则返回NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
 * Finds an element by its rank. The rank argument needs to be 1-based.
 * */
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
    zskiplistNode *x;
    unsigned long traversed = 0;//累计跨度
    int i;
 
    x = zsl->header;
    /*
     * 从头节点的最高层出发,如果基于当前层能够递进到前一个节点,
     * 则把当前节点的跨度加到traversed。
     */
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
            traversed += x->level[i].span;
            x = x->level[i].forward;
        }
        /*
         * 如果累计跨度与调用方传入的跨度相等,则代表x已经前进到调用方
         * 所要求达到的跨度的节点,返回x。
         */
        if (traversed == rank) {
            return x;//<1>
        }
    }
    //如果传入的跨度大于头节点到尾节点的跨度,则返回NULL。
    return NULL;
}

   

跳跃表除了可以根据索引区间来查询,还可以根据分值区间来查询,这里我们又见到了结构体zrangespec。当我们需要判断一个节点是否落在我们指定的分值区间内,需要调用zslValueGteMin()和zslValueLteMax(),当传入一个指定的分值和区间,zslValueGteMin()和zslValueLteMax()的结果不为0,则表明节点落在分值区间内。此外,这两个方法还可以判断一个跳跃表是否和区间有交集,比如调用zslValueGteMin()时,传入尾节点(跳跃表分值最大的节点)及一个指定区间,如果尾节点没有落在指定区间,代表此区间都大于尾节点,此时我们不需要遍历跳跃表即可返回一个空数组,告诉客户端在指定区间内找不到任何节点;同理,调用zslValueLteMax()时传入头节点L0层的前驱节点(跳跃表分值最小节点)没有落在区间内,则表明区间小于跳跃表,同样不需要遍历跳跃表即可返回空数组给客户端,告诉客户端在指定区间内找不到任何节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
 * Struct to hold a inclusive/exclusive range spec by score comparison.
 * 此结构体用于表示一个指定区间,minex为0时表示在进行最小值比较时,要包含最小值本身
 * 同理maxex为0时表示进行最大值比较时,要包含最大值本身。
 * 比如:min=2,max=9
 * 当minex=0,maxex=0时,区间为:[2,9]
 * 当minex=1,maxex=0时,区间为:(2,9]
 * 当minex=0,maxex=1时,区间为:[2,9)
 * 当minex=1,maxex=1时,区间为:(2,9)
 * */
typedef struct {
    double min, max;
    int minex, maxex; /* are min or max exclusive? */
} zrangespec;
 
/*
 * 如果spec->minex不为0,返回分值是否大于区间最小值的比较结果,
 * 为0则返回分值是否大于等于区间最小值的比较结果。
 * 如果传入一个跳跃表尾节点的分值zsl->tail.score(即:跳跃表最大分值)和区间返回结果为0,
 * 则表示跳跃表和区间没有交集。
 * 这里分两种情况:
 * spec->minex不为0:区间要查询分值大于spec->min的元素,
 * zsl->tail.score<=spec->min代表跳跃表最大分值小于等于min,返回结果为0。
 * spec->minex为0:区间要查询分值大于等于spec->min的元素,
 * zsl->tail.score<spec->min代表跳跃表最大分值小于min,返回结果为0。
 */
int zslValueGteMin(double value, zrangespec *spec) {
    return spec->minex ? (value > spec->min) : (value >= spec->min);
}
 
/*
 * 如果spec->maxex不为0,返回分值是否小于区间最大值的比较结果,为0则返回分值
 * 是否小于等于区间最大值的比较结果。
 * 如果传入一个跳跃表头节点L0层指向节点的分值
 * zsl->header.level[0].forward.score(即:跳跃表最小分值)和
 * 区间返回结果为0,则表示跳跃表和区间没有交集。
 * 这里分两种情况:
 * spec->maxex不为0:区间要查询分值小于spec->max的元素,
 * zsl->header.level[0].forward.score>=spec->max代表跳跃表最小分值大于等于区间
 * 最大分值,返回结果为0。
 * spec->maxex为0:区间要查询分值小于等于spec->max的元素,
 * zsl->header.level[0].forward.score>spec->max代表跳跃表最小分值大于区间最大分值,
 * 返回结果为0。
 */
int zslValueLteMax(double value, zrangespec *spec) {
    return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

    

在真正根据分值区间查询跳跃表前,会校验区间是否有效,如果我们输入一个区间[a,b],但a>b,那么这个区间肯定是无效区间,无须遍历跳跃表;如果a=b,如果区间的开闭状态出现:(a,b)、(a,b]、[a,b)这三种情况,也是无效区间,只有[a,b]才会去查询节点,表示需要查找分值为a(或者b)的节点。当校验完区间是有效后,还会调用zslValueGteMin()和zslValueLteMax()判断跳跃表和区间是否存在交集,即区间是否整体大于跳跃表或整体小于跳跃表,如果出现这两种情况则表明区间和跳跃表无交集,也就不需要遍历。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
 * Returns if there is a part of the zset is in range.
 * 判断跳跃表和区间是否存在交集
 * */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
 
    /*
     * Test for ranges that will always be empty.
     * 校验区间范围是否有效,无效则返回0表示查询结果为空:
     * 1.如果最小值大于最大值,则无效。
     * 2.如果最小值等于最大值,且区间为:(min,max)、(min,max]、[min,max)则无效。
     * */
    if (range->min > range->max ||
        (range->min == range->max && (range->minex || range->maxex)))
        return 0;
    /*
     * 如果尾节点不为NULL,则把跳跃表最大分值zsl->tail.score与区间比较,
     * 如果range->minex不为0,则查询分值大于range->min的元素,如果跳跃表
     * 最大分值zsl->tail.score小于等于range->min,则表示跳跃表和区间没有交集,
     * 无须遍历跳跃表查询;同理如果range->minex为0,则查询分值大于等于range->min
     * 的元素,如果zsl->tail.score小于range->min,则表示跳跃表和区间没有交集,
     * 也无须遍历跳跃表查询。
     */
    x = zsl->tail;
    if (x == NULL || !zslValueGteMin(x->score, range))
        return 0;
    /*
     * 如果头节点L0层的前驱节点不为NULL,则把跳跃表最小分值zsl->header->level[0].forward.score
     * 与区间比较,如果range->maxex不为0,则查询分值小于range->maxex的元素,如果跳跃表最小
     * 分值zsl->header->level[0].forward.score大于等于range->max,则表示跳跃表和区间没有交集,
     * 无须遍历跳跃表查询;同理如果range->maxex为0,则查询分值小于等于range->maxex的元素,如果跳跃表
     * 最小分值zsl->header->level[0].forward.score大于range->maxex,则表示跳跃表和区间没有交集,
     * 也无须遍历跳跃表查询。
     */
    x = zsl->header->level[0].forward;
    if (x == NULL || !zslValueLteMax(x->score, range))
        return 0;
    //跳跃表和区间存在交集,需要遍历跳跃表查询。
    return 1;
}

  

跳跃表允许我们根据分值区间进行正向查询(分值从小到大)或逆向查询(分值从大到小),如果是正向查询,则调用zslFirstInRange()方法,先判断跳跃表和指定区间是否存在交集,如果存在则查找指定区间内的分值最小的节点并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/*
 * Find the first node that is contained in the specified range.
 * Returns NULL when no element is contained in the range.
 * 查找落在指定区间的第一个节点,如果没有元素落在这个区间则返回NULL。
 * */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
    int i;
 
    /*
     * If everything is out of range, return early.
     * 如果跳跃表和区间没有交集则无须遍历,直接返回NULL。
     * */
    if (!zslIsInRange(zsl, range)) return NULL;
    //从头节点的最高层开始遍历
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        /*
         * Go forward while *OUT* of range.
         * 如果x->level[i].forward不为NULL,根据其分值x->level[i].forward->score和
         * range->minex判断前驱节点是否能前进。
         * 这里分两种情况:
         * range->minex不为0:判断前驱节点的分值是否大于range->min,如果小于等于的话
         * expression=zslValueGteMin(x->level[i].forward->score, range)为0,
         * 代表需要前进,找到大于min的节点,而!expression为1,while条件成立,x会
         * 前进到它的前驱节点。当x的前驱节点的分值大于min,就会停止循环,x会停留在区间内
         * 第一个节点的后继节点。
         * range->minex为0:判断前驱节点的分值是否大于等于range->min,如果小于的话,
         * expression=zslValueGteMin(x->level[i].forward->score, range),expression为0,
         * 需要前进,找到大于等于min的节点,而!expression为1,while条件成立,x会
         * 前进到它的前驱节点。当x的前驱节点的分值大于等于min,就会停止循环,x会停留在区间内
         * 第一个节点的后继节点。
         * */
        while (x->level[i].forward &&
               !zslValueGteMin(x->level[i].forward->score, range))
            x = x->level[i].forward;
    }
 
    /*
     * This is an inner range, so the next node cannot be NULL.
     * 上面的循环会让x停留在区间内第一个节点的后继节点,为了达到区间内的
     * 第一个节点,x要在L0层前进到它的前驱节点。
     * */
    x = x->level[0].forward;
    serverAssert(x != NULL);
 
    /*
     * Check if score <= max.
     * range->maxex不为0:如果区间内第一个节点的分值大于等于spec->max,
     * expression=zslValueLteMax(x->score, range),expression结果为0
     * !expression为1,表示查询异常,返回NULL。
     * range->maxex为0:如果区间内第一个节点的分值大于spec->max,
     * 则expression=zslValueLteMax(x->score, range),expression结果为0
     * !expression为1,表示查询异常,返回NULL。
     * */
    if (!zslValueLteMax(x->score, range)) return NULL;
    return x;
}

   

如果要进行逆向查询,则调用zslLastInRange(),这里同样先判断跳跃表是否和区间存在交集,只有存在交集才会进行下一步的判断,查找指定区间内分值最大的节点并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/*
 * Find the last node that is contained in the specified range.
 * Returns NULL when no element is contained in the range.
 * 查找落在指定区间的最后一个节点,如果没有元素落在这个区间则返回NULL。
 * */
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
    zskiplistNode *x;
    int i;
 
    /*If everything is out of range, return early.*/
    if (!zslIsInRange(zsl, range)) return NULL;<br>
    //从头节点的最高层开始遍历
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        /*
         * Go forward while *IN* range.
         * 根据区间range和前驱节点的分值判断是否前进,如果x->level[i].forward
         * 不为NULL,根据range->maxex判断前驱节点是否能前进。
         * 这里分两种情况:
         * 如果range->maxex不为0,且前驱节点的分值小于range->max,则可以前进。
         * 如果range->maxex为0,且前驱节点的分值小于等于range->max,则可以前进。
         * */
        while (x->level[i].forward &&
               zslValueLteMax(x->level[i].forward->score, range))
            x = x->level[i].forward;
    }
 
    /* This is an inner range, so this node cannot be NULL. */
    serverAssert(x != NULL);
 
    /*
     * Check if score >= min.
     * 如果range->minex不为0,x的分值小于或等于range->min,代表查询出现异常,则返回NULL。
     * 如果range->minex为0,x的分值小于range->min,代表查询出现异常,则返回NULL。
     * */
    if (!zslValueGteMin(x->score, range)) return NULL;
    return x;
}

   

在了解完上面的内容后,下面我们要步入正题:如何根据分值区间进行正向或逆向查找节点。在下面代码<1>处,会根据逆向状态选择ln是指向区间分值最大的节点,或是分值最小的节点。在定位到起始节点后,会在<2>处的while循环对节点进行偏移,如果到达偏移位置后的ln不为NULL,则会进入<3>处的while循环,查找分值落在区间内的节点,这里会根据逆向状态是否不为0,决定是用backward指针后退,还是向L0层的前驱节点递进,一直到分值不落在区间内跳出while循环,或者ln为NULL,又或者limit为0结束while循环。如果我们没有指定偏移(offset)和返回数量(limit),则不会进行偏移,limit默认值为-1,limit--永远不为0,这里会返回落在区间内的所有节点,能结束while循环只有遇到分值不落在区间内的节点,或者是ln为NULL。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(client *c, int reverse) {
    zrangespec range;//指定区间
    robj *key = c->argv[1];
    robj *zobj;
    long offset = 0, limit = -1;//偏移和结果返回数量
    int withscores = 0;
    unsigned long rangelen = 0;
    void *replylen = NULL;
    int minidx, maxidx;
 
    /*
     * Parse the range arguments.
     * 解析范围参数
     * ZRANGEBYSCORE key min max和ZREVRANGEBYSCORE key max min两个命令
     * 都是此函数实现的,如果客户端输入的命令为ZRANGEBYSCORE,则reverse为0,按
     * 从小到大查找分值及元素,分值小的在前,分值大的在后。如果客户端输入的命令为
     * ZREVRANGEBYSCORE,则reverse不为0,按从大到小查找分值及元素,分值大的在前,
     * 分值小的在后。
     *
     * */
    if (reverse) {
        /* Range is given as [max,min] */
        maxidx = 2;
        minidx = 3;
    else {
        /* Range is given as [min,max] */
        minidx = 2;
        maxidx = 3;
    }
 
    if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) {
        addReplyError(c, "min or max is not a float");
        return;
    }
 
    /*
     * Parse optional extra arguments. Note that ZCOUNT will exactly have
     * 4 arguments, so we'll never enter the following code path.
     * 遍历可选参数,这里会判断是否要返回分值(withscores),是否要对查询结果进行偏移(offset)和数量(limit)的限制
     * */
    if (c->argc > 4) {
        int remaining = c->argc - 4;
        int pos = 4;
 
        while (remaining) {
            if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr, "withscores")) {
                pos++;
                remaining--;
                withscores = 1;
            else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr, "limit")) {
                if ((getLongFromObjectOrReply(c, c->argv[pos + 1], &offset, NULL)
                     != C_OK) ||
                    (getLongFromObjectOrReply(c, c->argv[pos + 2], &limit, NULL)
                     != C_OK)) {
                    return;
                }
                pos += 3;
                remaining -= 3;
            else {
                addReply(c, shared.syntaxerr);
                return;
            }
        }
    }
 
    /*
     * Ok, lookup the key and get the range
     * 如果key所对应的zobj不存在,或者zobj的类型不为zset,则退出。
     * */
    if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL ||
        checkType(c, zobj, OBJ_ZSET))
        return;
 
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        //压缩列表流程...
    else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//zobj类型为跳跃表
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        zskiplistNode *ln;
 
        /*
         * If reversed, get the last node in range as starting point.
         * 如果是逆向查询,ln会指向区间分值最大的节点,如果是正向查询,ln则指向区间分值最小的节点。
         * */
        if (reverse) {
            ln = zslLastInRange(zsl, &range);
        else {
            ln = zslFirstInRange(zsl, &range);
        }
 
        /*
         * No "first" element in the specified interval.
         * 如果没有落在区间的开始节点则退出
         * */
        if (ln == NULL) {
            addReply(c, shared.emptyarray);
            return;
        }
 
        /* We don't know in advance how many matching elements there are in the
         * list, so we push this object that will represent the multi-bulk
         * length in the output buffer, and will "fix" it later */
        replylen = addReplyDeferredLen(c);
 
        /*
         * If there is an offset, just traverse the number of elements without
         * checking the score because that is done in the next loop.
         * <2>如果有偏移量,则根据reverse状态选择是后退还是递进,以达到偏移量。
         * 如果客户端有传入偏移,则offset不为0,这里会循环到offset为0或ln为NULL时跳出循环。
         * 否则offset默认值为0,不会进入此循环。
         * */
        while (ln && offset--) {
            if (reverse) {
                ln = ln->backward;
            else {
                ln = ln->level[0].forward;
            }
        }
        /*
         * <3>如果客户端有传入偏移和数量,则limit不为0,此时会根据reverse状态后退或者前进至
         * ln为NULL或者limit为0,否则limit默认值为-1,limit--永远为true(注:只要limit
         * 不为0,则永远为true,即便是负数),这里就会循环到ln为NULL时,获取所有分值符合区间
         * 节点的。
         * 除了limit为0,或者ln为NULL会跳出while循环,在<4>处还会根据reverse状态判断分值是否在区间,
         * 如果不在则跳出循环,如果分值符合区间,还会在<5>处根据reverse状态选择是后退到后一个节点(ln->backward),
         * 还是前进到前一个节点(ln->level[0].forward)。
         */
        while (ln && limit--) {
            /*Abort when the node is no longer in range.*/
            if (reverse) {//<4>
                if (!zslValueGteMin(ln->score, &range)) break;
            else {
                if (!zslValueLteMax(ln->score, &range)) break;
            }
 
            rangelen++;
            if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
            addReplyBulkCBuffer(c, ln->ele, sdslen(ln->ele));
            if (withscores) addReplyDouble(c, ln->score);
 
            /* Move to next node */
            if (reverse) {//<5>
                ln = ln->backward;
            else {
                ln = ln->level[0].forward;
            }
        }
    else {
        serverPanic("Unknown sorted set encoding");
    }
 
    if (withscores && c->resp == 2) rangelen *= 2;
    setDeferredArrayLen(c, replylen, rangelen);
}

  

最后,我们要了解如何获取一个元素在跳跃表中的索引,其实这里面的逻辑也是非常的简单,我们先从字典上获取节点的分值,然后根据分值及元素获取其在跳跃表中的索引,这里依旧支持正向查询或逆向查询,如果是正向查询,分值越小,索引越小,如果分值相等,则元素越小,索引越小;如果是逆向查询,则分值越大,索引越小,如果分值相等,则元素越大,索引越小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/* Given a sorted set object returns the 0-based rank of the object or
 * -1 if the object does not exist.
 * 返回元素在有序集合中的索引,如果返回-1则代表元素不在有序集合内。
 *
 * For rank we mean the position of the element in the sorted collection
 * of elements. So the first element has rank 0, the second rank 1, and so
 * forth up to length-1 elements.
 * 在跳跃表中第一个元素的索引为0,第二个元素索引为1,以此类推,最后一个元素索引为length-1。
 *
 * If 'reverse' is false, the rank is returned considering as first element
 * the one with the lowest score. Otherwise if 'reverse' is non-zero
 * the rank is computed considering as element with rank 0 the one with
 * the highest score.
 * 如果reverse为0,跳跃表索引从分值最小的节点开始,即zsl->header.level[0].forward索引为0、
 * zsl->header.level[0].forward.level[0].forward索引为1,zsl->tail索引为zsl-length-1;
 * 如果reverse不为0,跳跃表索引从分值最大的节点开始,即zsl->tail索引为0,zsl->tail.backward索引
 * 为1,zsl->header.level[0].forward索引为zsl->length-1
 * */
long zsetRank(robj *zobj, sds ele, int reverse) {
    unsigned long llen;
    unsigned long rank;
 
    llen = zsetLength(zobj);
 
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        //压缩列表逻辑……
    else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;
 
        de = dictFind(zs->dict, ele);
        if (de != NULL) {
            /*
             * 如果元素存在在跳跃表上,则获取元素的分支,并根据
             * 分支判断其在跳跃表中的跨度,根据跨度计算节点在跳跃表
             * 中的索引。
             * 如果是正向查询(reverse为0),则索引为跨度(rank)-1。
             * 如果是逆向查询(reverse不为0),则索引为跳跃表长度(zsl->length)-跨度(rank)。
             */
            score = *(double *) dictGetVal(de);
            rank = zslGetRank(zsl, score, ele);
            /* Existing elements always have a rank. */
            serverAssert(rank != 0);
            if (reverse)
                return llen - rank;
            else
                return rank - 1;
        else {//如果元素不在跳跃表上,则返回-1
            return -1;
        }
    else {
        serverPanic("Unknown sorted set encoding");
    }
}

  

获取完分值后,需要定位节点在跳跃表中的跨度,然后根据逆向状态及跨度,计算节点在跳跃表中的索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* Find the rank for an element by both score and key.
 * Returns 0 when the element cannot be found, rank otherwise.
 * Note that the rank is 1-based due to the span of zsl->header to the
 * first element.
 * 根据给定的分支和元素查找其节点在跳跃表中的跨度,返回0代表节点不存在。
 * */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
    //从头节点最高层遍历,如果能前进到前一个节点,则把当前节点的跨度加到rank上
    x = zsl->header;
    for (i = zsl->level - 1; i >= 0; i--) {
        while (x->level[i].forward &&
               (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                 sdscmp(x->level[i].forward->ele, ele) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }
 
        /*
         * x might be equal to zsl->header, so test if obj is non-NULL
         * x可能停留在头节点,此处判断是保证节点的元素不为NULL。
         * */
        if (x->ele && sdscmp(x->ele, ele) == 0) {
            return rank;
        }
    }
    return 0;
}

  

至此,笔者和大家一起学习了Redis跳跃表的是如何插入节点、删除节点、更新节点以及如何对跳跃表中的节点进行不同维度(索引、分值)的查询。跳跃表是一种应用相当广泛的数据结构,很多场景下人们都用跳跃表代替B-Tree,因为跳跃表和B-Tree有着一样的查询时间复杂度O(logN),但跳跃表的实现却比B-Tree简单很多。而Redis正是借助了跳跃表的思路实现了有序集合,使得很多需要存储、排序海量数据的业务得以实现,如:微博热搜或者头条新闻,都可以使用Redis有序集合来解决。

当然,由于笔者的时间精力有限,这里并没有完全介绍所有跳跃表命令的相关实现,但笔者相信能看到这里的人,基本已经掌握了跳跃表的整体脉络。如果对跳跃表其余命令有兴趣的朋友,可以自行翻阅Redis源码,或者评论私信笔者你们的疑问,如果问题多的话笔者还会针对大家共同的问题进行讲解。

来源https://www.cnblogs.com/beiluowuzheng/p/14883192.html

服务器评测 http://www.cncsto.com/ 

服务器测评 http://www.cncsto.com/ 

站长资源 https://www.cscnn.com/ 


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐