1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Redis(八):zset/zadd/zrange/zrembyscore 命令源碼解析

        共 39775字,需瀏覽 80分鐘

         ·

        2021-03-06 05:54

        走過路過不要錯過

        點擊藍字關注我們


        前面幾篇文章,我們完全領略了redis的string,hash,list,set數(shù)據(jù)類型的實現(xiàn)方法,相信對redis已經(jīng)不再神秘。

        本篇我們將介紹redis的最后一種數(shù)據(jù)類型: zset 的相關實現(xiàn)。

         本篇過后,我們對redis的各種基礎功能,應該不會再有疑惑。有可能的話,我們后續(xù)將會對redis的高級功能的實現(xiàn)做解析。(如復制、哨兵模式、集群模式)

         回歸本篇主題,zset。zset 又稱有序集合(sorted set),即是序版本的set。經(jīng)過上篇的介紹,大家可以看到,redis的讀取功能相當有限,許多是基于隨機數(shù)的方式進行讀取,其原因就是set是無序的。當set有序之后,查詢能力就會得到極大的提升。

        1. 可以根據(jù)下標進行定位元素;

        2. 可以范圍查詢元素; 這是有序帶來的好處。

        那么,我們不妨先思考一下,如何實現(xiàn)有序?兩種方法:1. 根據(jù)添加順序定義,1、2、3... ; 2. 自定義排序值; 第1種方法實現(xiàn)簡單,添加時復雜度小,但是功能受限;第2種方法相對自由,對于每次插入都可能涉及重排序問題,但是查詢相對穩(wěn)定,可以不必完全受限于系統(tǒng)實現(xiàn);

        同樣,我們以功能列表,到數(shù)據(jù)結(jié)構(gòu),再功能實現(xiàn)的思路,來解析redis的zset有序集合的實現(xiàn)方式吧。

        零、redis zset相關操作方法

        zset: Redis 有序集合是string類型元素的集合,且不允許重復的成員。每個元素都會關聯(lián)一個double類型的分數(shù),通過分數(shù)來為集合中的成員進行從小到大的排序。

        使用場景如: 保存任務隊列,該隊列由后臺定時掃描; 排行榜;

        從官方手冊上查到相關使用方法如下:

        1> ZADD key score1 member1 [score2 member2]
        功能: 向有序集合添加一個或多個成員,或者更新已存在成員的分數(shù)
        返回值: 添加成功的元素個數(shù)(已存在的添加不成功)

        2> ZCARD key
        功能: 獲取有序集合的成員數(shù)
        返回值: 元素個數(shù)或0

        3> ZCOUNT key min max
        功能: 計算在有序集合中指定區(qū)間分數(shù)的成員數(shù)
        返回值: 區(qū)間內(nèi)的元素個數(shù)

        4> ZINCRBY key increment member
        功能: 有序集合中對指定成員的分數(shù)加上增量 increment
        返回值: member增加后的分數(shù)

        5> ZINTERSTORE destination numkeys key [key ...]
        功能: 計算給定的一個或多個有序集的交集并將結(jié)果集存儲在新的有序集合 key 中
        返回值: 交集元素個數(shù)

        6> ZLEXCOUNT key min max
        功能: 在有序集合中計算指定字典區(qū)間內(nèi)成員數(shù)量
        返回值: 區(qū)間內(nèi)的元素個數(shù)

        7> ZRANGE key start stop [WITHSCORES]
        功能: 通過索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員
        返回值: 區(qū)間內(nèi)元素列表

        8> ZRANGEBYLEX key min max [LIMIT offset count]
        功能: 通過字典區(qū)間返回有序集合的成員
        返回值: 區(qū)間內(nèi)元素列表

        9> ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
        功能: 通過分數(shù)返回有序集合指定區(qū)間內(nèi)的成員
        返回值: 區(qū)間內(nèi)元素列表

        10> ZRANK key member
        功能: 返回有序集合中指定成員的索引
        返回值: member的排名或者 nil

        11> ZREM key member [member ...]
        功能: 移除有序集合中的一個或多個成員
        返回值: 成功移除的元素個數(shù)

        12> ZREMRANGEBYLEX key min max
        功能: 移除有序集合中給定的字典區(qū)間的所有成員
        返回值: 成功移除的元素個數(shù)

        13> ZREMRANGEBYRANK key start stop
        功能: 移除有序集合中給定的排名區(qū)間的所有成員
        返回值: 成功移除的元素個數(shù)

        14> ZREMRANGEBYSCORE key min max
        功能: 移除有序集合中給定的分數(shù)區(qū)間的所有成員
        返回值: 成功移除的元素個數(shù)

        15> ZREVRANGE key start stop [WITHSCORES]
        功能: 返回有序集中指定區(qū)間內(nèi)的成員,通過索引,分數(shù)從高到低
        返回值: 區(qū)間內(nèi)元素列表及分數(shù)

        16> ZREVRANGEBYSCORE key max min [WITHSCORES]
        功能: 返回有序集中指定分數(shù)區(qū)間內(nèi)的成員,分數(shù)從高到低排序
        返回值: 區(qū)間內(nèi)元素列表及分數(shù)

        17> ZREVRANK key member
        功能: 返回有序集合中指定成員的排名,有序集成員按分數(shù)值遞減(從大到小)排序
        返回值: member排名或者 nil

        18> ZSCORE key member
        功能: 返回有序集中,成員的分數(shù)值
        返回值: member分數(shù)

        19> ZUNIONSTORE destination numkeys key [key ...]
        功能: 計算給定的一個或多個有序集的并集,并存儲在新的 key 中
        返回值: 存儲到新key的元素個數(shù)

        20> ZSCAN key cursor [MATCH pattern] [COUNT count]
        功能: 迭代有序集合中的元素(包括元素成員和元素分值)
        返回值: 元素列表

        21> ZPOPMAX/ZPOPMIN/BZPOPMAX/BZPOPMIN


        一、zset 相關數(shù)據(jù)結(jié)構(gòu)

        zset 的實現(xiàn),使用了 ziplist, zskiplist 和 dict 進行實現(xiàn)。

        /* ZSETs use a specialized version of Skiplists */typedef struct zskiplistNode {    sds ele;    double score;    struct zskiplistNode *backward;    struct zskiplistLevel {        struct zskiplistNode *forward;        unsigned int span;    } level[];} zskiplistNode;// 跳躍鏈表typedef struct zskiplist {    struct zskiplistNode *header, *tail;    unsigned long length;    int level;} zskiplist;// zset 主數(shù)據(jù)結(jié)構(gòu),dict + zskiplisttypedef struct zset {    dict *dict;    zskiplist *zsl;} zset;// zset 在合適場景下,將先使用 ziplist 存儲數(shù)據(jù)typedef struct zlentry {    unsigned int prevrawlensize, prevrawlen;    unsigned int lensize, len;    unsigned int headersize;    unsigned char encoding;    unsigned char *p;} zlentry;

        二、zadd 添加成員操作

        從添加實現(xiàn)中,我們可以完整領略數(shù)據(jù)結(jié)構(gòu)的運用。

        // 用法: ZADD key score1 member1 [score2 member2]// t_zset.cvoid zaddCommand(client *c) {    // zadd 的多個參數(shù)變形, 使用 flags 進行區(qū)分復用    zaddGenericCommand(c,ZADD_NONE);}void zaddGenericCommand(client *c, int flags) {    static char *nanerr = "resulting score is not a number (NaN)";    robj *key = c->argv[1];    robj *zobj;    sds ele;    double score = 0, *scores = NULL, curscore = 0.0;    int j, elements;    int scoreidx = 0;    /* The following vars are used in order to track what the command actually     * did during the execution, to reply to the client and to trigger the     * notification of keyspace change. */    int added = 0;      /* Number of new elements added. */    int updated = 0;    /* Number of elements with updated score. */    int processed = 0;  /* Number of elements processed, may remain zero with                           options like XX. */
        /* Parse options. At the end 'scoreidx' is set to the argument position * of the score of the first score-element pair. */ // 從第三位置開始嘗試解析特殊標識(用法規(guī)范) // 按位與到 flags 中 scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; // NX: 不更新已存在的元素,只做添加操作 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; // XX: 只做更新操作,不做添加操作 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; // CH: 將返回值從添加的新元素數(shù)修改為已更改元素的總數(shù)。更改的元素是第添加的新元素以及已為其更新分數(shù)的現(xiàn)有元素。因此,命令行中指定的具有與過去相同分數(shù)的元素將不計算在內(nèi)。注意:通常,ZADD的返回值僅計算添加的新元素的數(shù)量。 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; // INCR: 使用指定元素增加指定分數(shù), 與 ZINCRBY 類似,此場景下,只允許操作一個元素 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; }
        /* Turn options into simple to check vars. */ int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0;
        /* After the options, we expect to have an even number of args, since * we expect any number of score-element pairs. */ // 把特殊標識去除后,剩下的參數(shù)列表應該2n數(shù),即 score-element 一一配對的,否則語法錯誤 elements = c->argc-scoreidx; if (elements % 2) { addReply(c,shared.syntaxerr); return; } elements /= 2; /* Now this holds the number of score-element pairs. */
        /* Check for incompatible options. */ // 互斥項 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } // 語法檢查,INCR 只能針對1個元素操作 if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; }
        /* Start parsing all the scores, we need to emit any syntax error * before executing additions to the sorted set, as the command should * either execute fully or nothing at all. */ // 解析所有的 score 值為double類型,賦值到 scores 中 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; }
        /* Lookup the key and create the sorted set if does not exist. */ // 語法檢查 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ // 創(chuàng)建原始key對象 // 默認 zset_max_ziplist_entries=OBJ_ZSET_MAX_ZIPLIST_ENTRIES: 128 // 默認 zset_max_ziplist_value=OBJ_ZSET_MAX_ZIPLIST_VALUE: 64 // 所以此處默認主要是檢查 第1個member的長度是大于 64 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { // 2. 通用情況使用 dict+quicklist 型的zset zobj = createZsetObject(); } else { // 1. 元素比較小的情況下創(chuàng)建 ziplist 型的 zset zobj = createZsetZiplistObject(); } // 將對象添加到db中,后續(xù)所有操作針對 zobj 操作即是對db的操作 (引用傳遞) dbAdd(c->db,key,zobj); } else { if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } // 一個個元素循環(huán)添加 for (j = 0; j < elements; j++) { score = scores[j];
        ele = c->argv[scoreidx+1+j*2]->ptr; // 分當前zobj的編碼不同進行添加 (ziplist, skiplist) // 3. ZIPLIST 編碼下的zset添加操作 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; // 3.1. 查找是否存在要添加的元素 (確定添加或更新) if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { if (nx) continue; if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } }
        /* Remove and re-insert when score changed. */ if (score != curscore) { // 3.2. 元素更新操作,先刪再插入 zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); server.dirty++; updated++; } processed++; } else if (!xx) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 5. 超過一條件后,做 ziplist->skiplist 轉(zhuǎn)換 // 默認 元素個數(shù)>128, 當前元素>64 // 這兩個判斷不會重復嗎??兩個原因: 1. 轉(zhuǎn)換函數(shù)內(nèi)部會重新判定; 2. 下一次循環(huán)時不會再走當前邏輯; if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); server.dirty++; added++; processed++; } } // 4. skiplist 下的zset元素添加 else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; // 判斷ele是否已存在,使用hash查找,快速 de = dictFind(zs->dict,ele); if (de != NULL) { if (nx) continue; curscore = *(double*)dictGetVal(de);
        if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); /* Don't need to check if the sorted set is empty * because we know it has at least one element. */ goto cleanup; } }
        /* Remove and re-insert when score changes. */ // 先刪再插入 skiplist if (score != curscore) { zskiplistNode *node; serverAssert(zslDelete(zs->zsl,curscore,ele,&node)); znode = zslInsert(zs->zsl,score,node->ele); /* We reused the node->ele SDS string, free the node now * since zslInsert created a new one. */ node->ele = NULL; zslFreeNode(node); /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. */ // 更新dict中的分數(shù)引用 dictGetVal(de) = &znode->score; /* Update score ptr. */ server.dirty++; updated++; } processed++; } else if (!xx) { ele = sdsdup(ele); znode = zslInsert(zs->zsl,score,ele); // 添加skiplist的同時,也往 dict 中添加一份數(shù)據(jù),因為hash的查找永遠是最快的 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); server.dirty++; added++; processed++; } } else { serverPanic("Unknown sorted set encoding"); } }
        reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { /* ZADD. */ addReplyLongLong(c,ch ? added+updated : added); }
        cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); }}
        // 1. 元素比較小的情況下創(chuàng)建 ziplist 型的 zset// object.c, 創(chuàng)建ziplist 的zsetrobj *createZsetZiplistObject(void) { unsigned char *zl = ziplistNew(); robj *o = createObject(OBJ_ZSET,zl); o->encoding = OBJ_ENCODING_ZIPLIST; return o;}// 2. 創(chuàng)建通用的 zset 實例// object.crobj *createZsetObject(void) { zset *zs = zmalloc(sizeof(*zs)); robj *o; // zsetDictType 稍有不同 zs->dict = dictCreate(&zsetDictType,NULL); // 首次遇到 skiplist, 咱去瞅瞅是如何創(chuàng)建的 zs->zsl = zslCreate(); o = createObject(OBJ_ZSET,zs); o->encoding = OBJ_ENCODING_SKIPLIST; return o;}// server.c, zset創(chuàng)建時使用的dict類型,與hash有不同/* Sorted sets hash (note: a skiplist is used in addition to the hash table) */dictType zsetDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ NULL, /* Note: SDS string shared & freed by skiplist */ NULL /* val destructor */};// 創(chuàng)建 skiplist 對象/* Create a new skiplist. */zskiplist *zslCreate(void) { int j; zskiplist *zsl;
        zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; // 創(chuàng)建header節(jié)點,ZSKIPLIST_MAXLEVEL 32 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // 初始化header for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl;}/* Create a skiplist node with the specified number of levels. * The SDS string 'ele' is referenced by the node after the call. */zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn;}

        // 3. ZIPLIST 編碼下的zset添加操作// 3.1. 查找是否存在要添加的元素 (確定添加或更新)// t_zset.c, 查找指定eleunsigned char *zzlFind(unsigned char *zl, sds ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; // 遍歷所有ziplist // 可見,此時的ziplist并沒有表現(xiàn)出有序啊 while (eptr != NULL) { // eptr 相當于是 key // sptr 相當于score sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL);
        if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) { /* Matching element, pull out score. */ // 找到相應的 key 后,解析下一值,即 score if (score != NULL) *score = zzlGetScore(sptr); return eptr; } /* Move to next element. */ // 移動兩次對象,才會到下一元素(因為存儲是 key-score 相鄰存儲) eptr = ziplistNext(zl,sptr); } return NULL;}// t_zset.c, 獲取元素的scoredouble zzlGetScore(unsigned char *sptr) { unsigned char *vstr; unsigned int vlen; long long vlong; char buf[128]; double score;
        serverAssert(sptr != NULL); serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong)); // 帶小數(shù)點不帶小數(shù)點 if (vstr) { memcpy(buf,vstr,vlen); buf[vlen] = '\0'; // 做類型轉(zhuǎn)換 score = strtod(buf,NULL); } else { score = vlong; }
        return score;}
        // 3.2. 元素更新操作,先刪再插入// t_zset.c/* Delete (element,score) pair from ziplist. Use local copy of eptr because we * don't want to modify the one given as argument. */unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { unsigned char *p = eptr;
        /* TODO: add function to ziplist API to delete N elements from offset. */ zl = ziplistDelete(zl,&p); zl = ziplistDelete(zl,&p); return zl;}// 添加 ele-score 到 ziplist 中/* Insert (element,score) pair in ziplist. This function assumes the element is * not yet present in the list. */unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; // 在上面查找時,我們看到ziplist也是遍歷,以為是無序的ziplist // 然而實際上,插入時是維護了順序的喲 while (eptr != NULL) { sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); s = zzlGetScore(sptr); // 找到第一個比score大的位置,在其前面插入 ele-score if (s > score) { /* First element with score larger than score for element to be * inserted. This means we should take its spot in the list to * maintain ordering. */ zl = zzlInsertAt(zl,eptr,ele,score); break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ // 當分數(shù)相同時,按字典順序排列 if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) { zl = zzlInsertAt(zl,eptr,ele,score); break; } }
        /* Move to next element. */ eptr = ziplistNext(zl,sptr); }
        /* Push on tail of list when it was not yet inserted. */ // 以上遍歷完成都沒有找到相應位置,說明當前score是最大值,將其插入尾部 if (eptr == NULL) zl = zzlInsertAt(zl,NULL,ele,score); return zl;}// 在eptr的前面插入 ele-scoreunsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; size_t offset;
        scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { // 直接插入到尾部 zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL); zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL); } else { /* Keep offset relative to zl, as it might be re-allocated. */ offset = eptr-zl; // 直接在 eptr 位置添加 ele, 其他元素后移 zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele)); eptr = zl+offset;
        /* Insert score after the element. */ // 此時的 eptr 已經(jīng)插入ele之后的位置,后移一位后,就可以找到 score 的存儲位置 serverAssert((sptr = ziplistNext(zl,eptr)) != NULL); zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } return zl;}
        // 4. skiplist 下的zset元素添加// 4.1. 添加元素// t_zset.c, 添加 ele-score 到 skiplist 中/* Insert a new node in the skiplist. Assumes the element does not already * exist (up to the caller to enforce that). The skiplist takes ownership * of the passed SDS string 'ele'. */zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { // ZSKIPLIST_MAXLEVEL 32 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level;
        serverAssert(!isnan(score)); x = zsl->header; // 初始 zsl->level = 1 // 從header的最高層開始遍歷 for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ // 計算出每層可以插入的位置 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; // 當前l(fā)evel的score小于需要添加的元素時,往前推進skiplist while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ // 得到一隨機的level, 決定要寫的節(jié)點數(shù) // 如果當前的level過小,則變更level, 重新初始化大的level level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } // 構(gòu)建新的 skiplist 節(jié)點,為每一層節(jié)點添加同樣的數(shù)據(jù) x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { // 讓i層的節(jié)點與x關聯(lián) x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x;
        /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; }
        /* increment span for untouched levels */ // 如果當前l(fā)evel較小,則存在有的level未賦值情況,需要主動+1 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } // 關聯(lián)好header后,設置backward指針 x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else // 同有后繼節(jié)點,說明是尾節(jié)點,賦值tail zsl->tail = x; zsl->length++; return x;}


        ziplist添加沒啥好說的,skiplist可以稍微提提,大體步驟為四步: 

        1. 找位置, 從最高層開始, 判斷是否后繼節(jié)點小,如果小則直接在本層迭代,否則轉(zhuǎn)到下一層迭代; (每一層都要迭代至相應的位置)
        2. 計算得到一新的隨機level,用于決定當前節(jié)點的層級;
        3. 依次對每一層與原跳表做關聯(lián);
        4. 設置backward指針;(雙向鏈表)

        相對說,skiplist 還是有點抽象,我們畫個圖來描述下上面的操作:

        // 補充,我們看一下隨機level的計算算法// t_zset.c/* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */int zslRandomLevel(void) {    int level = 1;    // n次隨機值得到 level, ZSKIPLIST_P:0.25    // 按隨機概率,應該是有1/4的命中概率(如果不是呢??)    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))        level += 1;    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;}

        先看插入過程的目的,主要是為了先理解 skiplist 的構(gòu)造過程。而在zset的更新過程,是先刪除原節(jié)點,再進行插入的這么個過程。所以咱們還是有必要再來看看 skiplist 的刪除節(jié)點過程。

        // t_zset.c, 刪除skiplist的指定節(jié)點/* Delete an element with matching score/element from the skiplist. * The function returns 1 if the node was found and deleted, otherwise * 0 is returned. * * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise * it is not freed (but just unlinked) and *node is set to the node pointer, * so that it is possible for the caller to reuse the node (including the * referenced SDS string at node->ele). */int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;    int i;
        x = zsl->header; // 與添加時查找對應位置一樣,先進行遍歷,找到最每個層級最接近 node 的位置 for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { x = x->level[i].forward; } update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ // 進行精確比對,相同才進行刪除 x = x->level[0].forward; if (x && score == x->score && sdscmp(x->ele,ele) == 0) { // 執(zhí)行刪除動作 zslDeleteNode(zsl, x, update); if (!node) zslFreeNode(x); else *node = x; return 1; } return 0; /* not found */}// 刪除 x對應的節(jié)點// update 是node的每一層級對應的前驅(qū)節(jié)點/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == x) { update[i]->level[i].span += x->level[i].span - 1; update[i]->level[i].forward = x->level[i].forward; } else { // 不相等說明該層不存在指向 x 的引用 update[i]->level[i].span -= 1; } } // 更新第0層尾節(jié)點指針 if (x->level[0].forward) { x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } // 降低 skiplist 的層級,直到第一個非空的節(jié)點為止 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; zsl->length--;}

        skiplist 刪除過程的示意圖如下:

        最后,我們再來看另一種情況,即zset發(fā)生編碼轉(zhuǎn)換時,是如何做的。即如何從 ziplist 轉(zhuǎn)換到 skiplist 中呢?

        // t_zset.c, 編碼類型轉(zhuǎn)換void zsetConvert(robj *zobj, int encoding) {    zset *zs;    zskiplistNode *node, *next;    sds ele;    double score;    // 編碼相同,直接返回    if (zobj->encoding == encoding) return;    // ziplist -> skiplist 轉(zhuǎn)換    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {        unsigned char *zl = zobj->ptr;        unsigned char *eptr, *sptr;        unsigned char *vstr;        unsigned int vlen;        long long vlong;
        if (encoding != OBJ_ENCODING_SKIPLIST) serverPanic("Unknown target encoding");
        zs = zmalloc(sizeof(*zs)); zs->dict = dictCreate(&zsetDictType,NULL); zs->zsl = zslCreate();
        eptr = ziplistIndex(zl,0); serverAssertWithInfo(NULL,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); serverAssertWithInfo(NULL,zobj,sptr != NULL);
        while (eptr != NULL) { score = zzlGetScore(sptr); serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) ele = sdsfromlonglong(vlong); else ele = sdsnewlen((char*)vstr,vlen); // 依次插入 skiplist 和 dict 中即可 node = zslInsert(zs->zsl,score,ele); serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); // zzlNext 封裝了同時迭代 eptr 和 sptr 方法 zzlNext(zl,&eptr,&sptr); }
        zfree(zobj->ptr); zobj->ptr = zs; zobj->encoding = OBJ_ENCODING_SKIPLIST; } // skiplist -> ziplist 逆向轉(zhuǎn)換 else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { unsigned char *zl = ziplistNew(); if (encoding != OBJ_ENCODING_ZIPLIST) serverPanic("Unknown target encoding");
        /* Approach similar to zslFree(), since we want to free the skiplist at * the same time as creating the ziplist. */ zs = zobj->ptr; dictRelease(zs->dict); node = zs->zsl->header->level[0].forward; zfree(zs->zsl->header); zfree(zs->zsl); // 正向迭代轉(zhuǎn)換 while (node) { zl = zzlInsertAt(zl,NULL,node->ele,node->score); next = node->level[0].forward; zslFreeNode(node); node = next; }
        zfree(zs); zobj->ptr = zl; zobj->encoding = OBJ_ENCODING_ZIPLIST; } else { serverPanic("Unknown sorted set encoding"); }}// 基于ziplist, 同時迭代 ele-score/* Move to next entry based on the values in eptr and sptr. Both are set to * NULL when there is no next entry. */void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) { unsigned char *_eptr, *_sptr; serverAssert(*eptr != NULL && *sptr != NULL);
        _eptr = ziplistNext(zl,*sptr); if (_eptr != NULL) { _sptr = ziplistNext(zl,_eptr); serverAssert(_sptr != NULL); } else { /* No next entry. */ _sptr = NULL; }
        *eptr = _eptr; *sptr = _sptr;}

        至此,整個添加過程結(jié)束。本身是不太復雜的,主要針對 ziplist 和 skiplist 的分別處理(注意有逆向編碼)。但為了講清整體關系,稍顯雜亂。

        三、zrange 范圍查詢

        范圍查詢功能,redis提供了好幾個,zrange/zrangebyscore/zrangebylex... 應該說查詢方式都不太一樣,不過我們也不必糾結(jié)這些,只管理會大概就行。就挑一個以 下標進行范圍查詢的實現(xiàn)講解下就行。

        // 用法: ZRANGE key start stop [WITHSCORES]// t_zset.cvoid zrangeCommand(client *c) {    zrangeGenericCommand(c,0);}
        void zrangeGenericCommand(client *c, int reverse) { robj *key = c->argv[1]; robj *zobj; int withscores = 0; long start; long end; int llen; int rangelen;
        if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
        if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) { withscores = 1; } else if (c->argc >= 5) { addReply(c,shared.syntaxerr); return; }
        if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,OBJ_ZSET)) return;
        /* Sanitize indexes. */ // 小于0,則代表反向查詢,但實際的輸出順序不是按此值運算的(提供了 reverse 方法) llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0;
        /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; rangelen = (end-start)+1;
        /* Return the result in form of a multi-bulk reply */ addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen); // 同樣,分 ZIPLIST 和 SKIPLIST 編碼分別實現(xiàn) if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; unsigned char *vstr; unsigned int vlen; long long vlong; // ziplist 以 ele-score 方式存儲,所以步長是 2 if (reverse) eptr = ziplistIndex(zl,-2-(2*start)); else eptr = ziplistIndex(zl,2*start);
        serverAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); // 依次迭代輸出 while (rangelen--) { serverAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) addReplyBulkLongLong(c,vlong); else addReplyBulkCBuffer(c,vstr,vlen);
        if (withscores) addReplyDouble(c,zzlGetScore(sptr)); // ziplist 提供正向迭代,返回迭代功能,其實就是 offset的加減問題 if (reverse) zzlPrev(zl,&eptr,&sptr); else zzlNext(zl,&eptr,&sptr); }
        } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; sds ele;
        /* Check if starting point is trivial, before doing log(N) lookup. */ // 反向使用 tail 迭代,否則使用header迭代 if (reverse) { ln = zsl->tail; if (start > 0) // 獲取下標元素應該只是一個迭代循環(huán)問題,不過還是稍微細看一下skiplist實現(xiàn) ln = zslGetElementByRank(zsl,llen-start); } else { ln = zsl->header->level[0].forward; if (start > 0) ln = zslGetElementByRank(zsl,start+1); }
        while(rangelen--) { serverAssertWithInfo(c,zobj,ln != NULL); ele = ln->ele; addReplyBulkCBuffer(c,ele,sdslen(ele)); if (withscores) addReplyDouble(c,ln->score); // 直接正向或反向迭代即可 ln = reverse ? ln->backward : ln->level[0].forward; } } else { serverPanic("Unknown sorted set encoding"); }}// 根據(jù)排名查找元素/* Finds an element by its rank. The rank argument needs to be 1-based. */zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i;
        x = zsl->header; // 好像沒有相像中的簡單哦 // 請仔細品 for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { // span 的作用?? traversed += x->level[i].span; x = x->level[i].forward; } if (traversed == rank) { return x; } } return NULL;}

        根據(jù)范圍查找元素,整體是比較簡單,迭代輸出而已。只是 skiplist 的span維護,得好好想想。

        四、zrembyscore 根據(jù)分數(shù)刪除元素

        zrembyscore, 首先這是個刪除命令,其實它是根據(jù)分數(shù)查詢,我們可以同時解析這兩種情況。

        // t_zset.c, void zremrangebyscoreCommand(client *c) {    // 幾個范圍刪除,都復用 zremrangeGenericCommand    // ZRANGE_RANK/ZRANGE_SCORE/ZRANGE_LEX    zremrangeGenericCommand(c,ZRANGE_SCORE);}void zremrangeGenericCommand(client *c, int rangetype) {    robj *key = c->argv[1];    robj *zobj;    int keyremoved = 0;    unsigned long deleted = 0;    // score 存儲使用另外的數(shù)據(jù)結(jié)構(gòu)    zrangespec range;    zlexrangespec lexrange;    long start, end, llen;
        /* Step 1: Parse the range. */ // 解析參數(shù),除了 rank 方式的查詢,其他兩個都使用 另外的專門數(shù)據(jù)結(jié)構(gòu)存儲參數(shù) if (rangetype == ZRANGE_RANK) { if ((getLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK) || (getLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)) return; } else if (rangetype == ZRANGE_SCORE) { if (zslParseRange(c->argv[2],c->argv[3],&range) != C_OK) { addReplyError(c,"min or max is not a float"); return; } } else if (rangetype == ZRANGE_LEX) { if (zslParseLexRange(c->argv[2],c->argv[3],&lexrange) != C_OK) { addReplyError(c,"min or max not valid string range item"); return; } }
        /* Step 2: Lookup & range sanity checks if needed. */ if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,OBJ_ZSET)) goto cleanup;
        if (rangetype == ZRANGE_RANK) { /* Sanitize indexes. */ llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0;
        /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { addReply(c,shared.czero); goto cleanup; } if (end >= llen) end = llen-1; }
        /* Step 3: Perform the range deletion operation. */ if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { // 針對不同的刪除類型,使用不同的刪除方法 // 所以,這段代碼的復用體現(xiàn)在哪里呢??? switch(rangetype) { case ZRANGE_RANK: zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted); break; case ZRANGE_SCORE: // 3.1. 我們只看 score 的刪除 --ziplist zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,&range,&deleted); break; case ZRANGE_LEX: zobj->ptr = zzlDeleteRangeByLex(zobj->ptr,&lexrange,&deleted); break; } if (zzlLength(zobj->ptr) == 0) { dbDelete(c->db,key); keyremoved = 1; } } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; switch(rangetype) { case ZRANGE_RANK: deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); break; case ZRANGE_SCORE: // 3.2. skiplist 的刪除rangeByScore 方法 deleted = zslDeleteRangeByScore(zs->zsl,&range,zs->dict); break; case ZRANGE_LEX: deleted = zslDeleteRangeByLex(zs->zsl,&lexrange,zs->dict); break; } if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) { dbDelete(c->db,key); keyremoved = 1; } } else { serverPanic("Unknown sorted set encoding"); }
        /* Step 4: Notifications and reply. */ if (deleted) { char *event[3] = {"zremrangebyrank","zremrangebyscore","zremrangebylex"}; signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET,event[rangetype],key,c->db->id); if (keyremoved) notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); } server.dirty += deleted; addReplyLongLong(c,deleted);
        cleanup: if (rangetype == ZRANGE_LEX) zslFreeLexRange(&lexrange);}// server.h, 范圍查詢參數(shù)存儲/* Struct to hold a inclusive/exclusive range spec by score comparison. */typedef struct { double min, max; int minex, maxex; /* are min or max exclusive? */} zrangespec;
        // 3.1. ziplist 的刪除range方法// t_zset.cunsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) { unsigned char *eptr, *sptr; double score; unsigned long num = 0; if (deleted != NULL) *deleted = 0; // 找到首個在范圍內(nèi)的指針,進行迭代 eptr = zzlFirstInRange(zl,range); if (eptr == NULL) return zl;
        /* When the tail of the ziplist is deleted, eptr will point to the sentinel * byte and ziplistNext will return NULL. */ while ((sptr = ziplistNext(zl,eptr)) != NULL) { score = zzlGetScore(sptr); // 肯定是比 min 大的,所以只需確認比 max 小即可 if (zslValueLteMax(score,range)) { /* Delete both the element and the score. */ zl = ziplistDelete(zl,&eptr); zl = ziplistDelete(zl,&eptr); num++; } else { /* No longer in range. */ break; } }
        if (deleted != NULL) *deleted = num; return zl;}
        /* Find pointer to the first element contained in the specified range. * Returns NULL when no element is contained in the range. */unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double score;
        /* If everything is out of range, return early. */ // 比較第1個元素和最后 一個元素,即可確認是否在范圍內(nèi) if (!zzlIsInRange(zl,range)) return NULL;
        while (eptr != NULL) { sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL);
        score = zzlGetScore(sptr); // score >= min if (zslValueGteMin(score,range)) { /* Check if score <= max. */ if (zslValueLteMax(score,range)) return eptr; return NULL; }
        /* Move to next element. */ eptr = ziplistNext(zl,sptr); }
        return NULL;}// 檢查zl是否在range范圍內(nèi)// 檢查第1個分數(shù)和最后一個數(shù)即可/* Returns if there is a part of the zset is in range. Should only be used * internally by zzlFirstInRange and zzlLastInRange. */int zzlIsInRange(unsigned char *zl, zrangespec *range) { unsigned char *p; double score;
        /* Test for ranges that will always be empty. */ if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex))) return 0;
        p = ziplistIndex(zl,-1); /* Last score. */ if (p == NULL) return 0; /* Empty sorted set */ score = zzlGetScore(p); // scoreMax >= min if (!zslValueGteMin(score,range)) return 0;
        p = ziplistIndex(zl,1); /* First score. */ serverAssert(p != NULL); score = zzlGetScore(p); // scoreMin <= max if (!zslValueLteMax(score,range)) return 0;
        return 1;}
        // 3.2. 刪除 skiplist 中的range元素/* Delete all the elements with score between min and max from the skiplist. * Min and max are inclusive, so a score >= min || score <= max is deleted. * Note that this function takes the reference to the hash table view of the * sorted set, in order to remove the elements from the hash table too. */unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned long removed = 0; int i;
        x = zsl->header; // 找出每層小于 range->min 的元素 for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (range->minex ? x->level[i].forward->score <= range->min : x->level[i].forward->score < range->min)) x = x->level[i].forward; update[i] = x; }
        /* Current node is the last with score < or <= min. */ x = x->level[0].forward; // 從第0層開始,依次刪除引用,刪除元素 // 同有找到符合條件的元素時,一次循環(huán)也不會成立 /* Delete nodes while in range. */ while (x && (range->maxex ? x->score < range->max : x->score <= range->max)) { // 保留下一次迭代 zskiplistNode *next = x->level[0].forward; zslDeleteNode(zsl,x,update); // 同步刪除 dict 數(shù)據(jù) dictDelete(dict,x->ele); zslFreeNode(x); /* Here is where x->ele is actually released. */ removed++; x = next; } return removed;}

        刪除的邏輯比較清晰,ziplist和skiplist分開處理。大體思路相同是:找到第一個符合條件的元素,然后迭代,直到第一個不符合條件的元素為止。

         set雖然從定義上與zset有很多相通之處,然而在實現(xiàn)上卻是截然不同的。由于很多東西和之前介紹的知識有重合的地方,也沒啥好特別說的。zset 的解析差不多就到這里了。

        你覺得zset還有什么有意思的實現(xiàn)呢?歡迎討論。




        往期精彩推薦



        騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)

        面試:史上最全多線程面試題 !

        最新阿里內(nèi)推Java后端面試題

        JVM難學?那是因為你沒認真看完這篇文章


        END


        關注作者微信公眾號 —《JAVA爛豬皮》


        了解更多java后端架構(gòu)知識以及最新面試寶典


        你點的每個好看,我都認真當成了


        看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


        作者:等你歸去來

        出處:https://www.cnblogs.com/yougewe/p/12253982.html

        瀏覽 40
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            免看一级一片 | 北条麻妃内射 | 嗯~啊~快点死我在线观看 | 午夜精品18| 五月天综合激情网 | 日本艹逼视频 | 秋霞日韩在线 | 在线免费黄色小视频 | 少妇五月天激情 | 天天日天天舔天天射 |