1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Redis(四):del/unlink 命令源碼解析

        共 16428字,需瀏覽 33分鐘

         ·

        2021-03-04 08:22

        走過路過不要錯過

        點擊藍(lán)字關(guān)注我們


        上一篇文章從根本上理解了set/get的處理過程,相當(dāng)于理解了 增、改、查的過程,現(xiàn)在就差一個刪了。本篇我們來看一下刪除過程。

        對于客戶端來說,刪除操作無需區(qū)分何種數(shù)據(jù)類型,只管進(jìn)行 del 操作即可。

        零、刪除命令 del 的定義

        主要有兩個: del/unlink, 差別是 unlink 速度會更快, 因為其使用了異步刪除優(yōu)化模式, 其定義如下:

         // 標(biāo)識只有一個 w, 說明就是一個普通的寫操作,沒啥好說的    {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0}    // 標(biāo)識為 wF, 說明它是一個快速寫的操作,其實就是有一個異步優(yōu)化的過程,稍后詳解    {"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0}


        一、delCommand

        delCommand 的作用就是直接刪除某個 key 的數(shù)據(jù),釋放內(nèi)存即可。

        // db.c, del 命令處理    void delCommand(client *c) {    // 同步刪除    delGenericCommand(c,0);}
        /* This command implements DEL and LAZYDEL. */void delGenericCommand(client *c, int lazy) { int numdel = 0, j;
        for (j = 1; j < c->argc; j++) { // 自動過期數(shù)據(jù)清理 expireIfNeeded(c->db,c->argv[j]); // 此處分同步刪除和異步刪除, 主要差別在于對于復(fù)雜數(shù)據(jù)類型的刪除方面,如hash,list,set... // 針對 string 的刪除是完全一樣的 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : dbSyncDelete(c->db,c->argv[j]); // 寫命令的傳播問題 if (deleted) { signalModifiedKey(c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC, "del",c->argv[j],c->db->id); server.dirty++; numdel++; } } // 響應(yīng)刪除數(shù)據(jù)量, 粒度到 key 級別 addReplyLongLong(c,numdel);}

        框架代碼一看即明,只是相比于我們普通的刪除是多了不少事情。否則也不存在設(shè)計了。

        二、unlinkCommand

        如下,其實和del是一毛一樣的,僅是變化了一個 lazy 標(biāo)識而已。

        // db.c, unlink 刪除處理void unlinkCommand(client *c) {    // 與 del 一致,只是 lazy 標(biāo)識不一樣    delGenericCommand(c,1);}

        三、刪除數(shù)據(jù)過程詳解

        刪除數(shù)據(jù)分同步和異步兩種實現(xiàn)方式,道理都差不多,只是一個是后臺刪一個是前臺刪。我們分別來看看。


        1. 同步刪除 dbSyncDelete

        同步刪除很簡單,只要把對應(yīng)的key刪除,val刪除就行了,如果有內(nèi)層引用,則進(jìn)行遞歸刪除即可。

        // db.c, 同步刪除數(shù)據(jù)/* Delete a key, value, and associated expiration entry if any, from the DB */int dbSyncDelete(redisDb *db, robj *key) {    /* Deleting an entry from the expires dict will not free the sds of     * the key, because it is shared with the main dictionary. */    // 首先從 expires 隊列刪除,然后再從 db->dict 中刪除    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);    if (dictDelete(db->dict,key->ptr) == DICT_OK) {        if (server.cluster_enabled) slotToKeyDel(key);        return 1;    } else {        return 0;    }}// dict.c, 如上, 僅僅是 dictDelete() 就可以了,所以真正的刪除動作是在 dict 中實現(xiàn)的。int dictDelete(dict *ht, const void *key) {    // nofree: 0, 即要求釋放內(nèi)存    return dictGenericDelete(ht,key,0);}// dict.c, nofree: 0:要釋放相應(yīng)的val內(nèi)存, 1:不釋放相應(yīng)val內(nèi)存只刪除key/* Search and remove an element */static int dictGenericDelete(dict *d, const void *key, int nofree){    unsigned int h, idx;    dictEntry *he, *prevHe;    int table;
        if (d->ht[0].size == 0) return DICT_ERR; /* d->ht[0].table is NULL */ if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); // ht[0] 和 ht[1] 如有可能都進(jìn)行掃描 for (table = 0; table <= 1; table++) { idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; prevHe = NULL; while(he) { if (dictCompareKeys(d, key, he->key)) { /* Unlink the element from the list */ if (prevHe) prevHe->next = he->next; else d->ht[table].table[idx] = he->next; // no nofree, 就是要 free 內(nèi)存咯 if (!nofree) { // 看起來 key/value 需要單獨釋放內(nèi)存哦 dictFreeKey(d, he); dictFreeVal(d, he); } zfree(he); d->ht[table].used--; return DICT_OK; } prevHe = he; he = he->next; } // 如果沒有進(jìn)行 rehashing, 只需掃描0就行了 if (!dictIsRehashing(d)) break; } return DICT_ERR; /* not found */}

        其實對于有GC收集器的語言來說,根本不用關(guān)注內(nèi)存的釋放問題,自有后臺工具處理,然而對于 c 語言這種級別語言,則是需要自行關(guān)注內(nèi)存的。這也是本文存在的意義,不然對于一個 hash 表的元素刪除操作,如上很難嗎?并沒有。

        下面,我們就來看看 redis 是如何具體釋放內(nèi)存的吧。

        // dict.h, 釋放key, value 的邏輯也是非常簡單,用一個宏就定義好了// 釋放依賴于 keyDestructor, valDestructor#define dictFreeKey(d, entry) \    if ((d)->type->keyDestructor) \        (d)->type->keyDestructor((d)->privdata, (entry)->key)#define dictFreeVal(d, entry) \    if ((d)->type->valDestructor) \        (d)->type->valDestructor((d)->privdata, (entry)->v.val)// 所以,我們有必要回去看看 key,value 的析構(gòu)方法// 而這,又依賴于具體的數(shù)據(jù)類型,也就是你在 setXXX 的時候用到的數(shù)據(jù)類型// 我們看一下這個 keyDestructor,valDestructor 初始化的樣子// server.c  kv的析構(gòu)函數(shù)定義/* Db->dict, keys are sds strings, vals are Redis objects. */dictType dbDictType = {    dictSdsHash,                /* hash function */    NULL,                       /* key dup */    NULL,                       /* val dup */    dictSdsKeyCompare,          /* key compare */    dictSdsDestructor,          /* key destructor */    dictObjectDestructor   /* val destructor */};
        // 1. 先看看 key destructor, key 的釋放// server.c, 直接調(diào)用 sds 提供的服務(wù)即可void dictSdsDestructor(void *privdata, void *val){ DICT_NOTUSED(privdata); // sds 直接釋放key就行了 sdsfree(val);}// sds.c, 真正釋放 value 內(nèi)存/* Free an sds string. No operation is performed if 's' is NULL. */void sdsfree(sds s) { if (s == NULL) return; // zfree, 確實很簡單嘛, 因為 sds 是連續(xù)的內(nèi)存空間,直接使用系統(tǒng)提供的方法即可刪除 s_free((char*)s-sdsHdrSize(s[-1]));}
        // 2. value destructor 對value的釋放, 如果說 key 一定是string格式的話,value可主不一定了,因為 redis提供豐富的數(shù)據(jù)類型呢// server.cvoid dictObjectDestructor(void *privdata, void *val){ DICT_NOTUSED(privdata);
        if (val == NULL) return; /* Lazy freeing will set value to NULL. */ decrRefCount(val);}// 減少 value 的引用計數(shù)void decrRefCount(robj *o) { if (o->refcount == 1) { switch(o->type) { // string 類型 case OBJ_STRING: freeStringObject(o); break; // list 類型 case OBJ_LIST: freeListObject(o); break; // set 類型 case OBJ_SET: freeSetObject(o); break; // zset 類型 case OBJ_ZSET: freeZsetObject(o); break; // hash 類型 case OBJ_HASH: freeHashObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o); } else { if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0"); if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--; }}

        額,可以看出,對key的釋放自然是簡單之極。而對 value 則謹(jǐn)慎許多,首先它表面上只對引用做減操作。只有發(fā)只剩下1個引用即只有當(dāng)前引用的情況下,本次釋放就是最后一次釋放,所以才會回收內(nèi)存。

        // 在介紹不同數(shù)據(jù)類型的內(nèi)存釋放前,我們可以先來看下每個元素的數(shù)據(jù)結(jié)構(gòu)// dict.htypedef struct dictEntry {    // 存儲 key 字段內(nèi)容    void *key;    // 用一個聯(lián)合體存儲value    union {        // 存儲數(shù)據(jù)時使用 *val 存儲        void *val;        uint64_t u64;        // 存儲過期時間時使用該字段        int64_t s64;        // 存儲 score 時使用        double d;    } v;    // 存在hash沖突時,作鏈表使用    struct dictEntry *next;} dictEntry;
        // 1. string 類型的釋放// object.cvoid freeStringObject(robj *o) { // 直接調(diào)用 sds服務(wù)釋放 if (o->encoding == OBJ_ENCODING_RAW) { sdsfree(o->ptr); }}
        // 2. list 類型的釋放// object.cvoid freeListObject(robj *o) { switch (o->encoding) { case OBJ_ENCODING_QUICKLIST: quicklistRelease(o->ptr); break; default: serverPanic("Unknown list encoding type"); }}// quicklist.c/* Free entire quicklist. */void quicklistRelease(quicklist *quicklist) { unsigned long len; quicklistNode *current, *next;
        current = quicklist->head; len = quicklist->len; // 鏈表依次迭代就可以釋放完成了 while (len--) { next = current->next; // 釋放list具體值 zfree(current->zl); quicklist->count -= current->count; // 釋放list對象 zfree(current);
        quicklist->len--; current = next; } zfree(quicklist);}
        // 3. set 類型的釋放// object.c, set 分兩種類型, ht, intsetvoid freeSetObject(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: // hash 類型則需要刪除每個 hash 的 kv dictRelease((dict*) o->ptr); break; case OBJ_ENCODING_INTSET: // intset 直接釋放 zfree(o->ptr); break; default: serverPanic("Unknown set encoding type"); }}// dict.c, /* Clear & Release the hash table */void dictRelease(dict *d){ // ht[0],ht[1] 依次清理 _dictClear(d,&d->ht[0],NULL); _dictClear(d,&d->ht[1],NULL); zfree(d);}// dict.c, /* Destroy an entire dictionary */int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { unsigned long i;
        /* Free all the elements */ for (i = 0; i < ht->size && ht->used > 0; i++) { dictEntry *he, *nextHe;
        if (callback && (i & 65535) == 0) callback(d->privdata); // 元素為空,hash未命中,但只要 used > 0, 代表就還有需要刪除的元素存在 // 其實對于只有少數(shù)幾個元素的情況下,這個效率就呵呵了 if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; // 這里的釋放 kv 邏輯和前面是一致的 // 看起來像是遞歸,其實不然,因為redis不存在數(shù)據(jù)類型嵌套問題,比如 hash下存儲hash, 所以不會存在遞歸 // 具體結(jié)構(gòu)會在后續(xù)解讀到 dictFreeKey(d, he); dictFreeVal(d, he); zfree(he); ht->used--; he = nextHe; } } /* Free the table and the allocated cache structure */ zfree(ht->table); /* Re-initialize the table */ _dictReset(ht); return DICT_OK; /* never fails */}
        // 4. hash 類型的釋放// object.c, hash和set其實是很相似的,代碼也做了大量的復(fù)用void freeHashObject(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: // ht 形式與set一致 dictRelease((dict*) o->ptr); break; case OBJ_ENCODING_ZIPLIST: // ziplist 直接釋放 zfree(o->ptr); break; default: serverPanic("Unknown hash encoding type"); break; }}
        // 5. zset 類型的釋放// object.c, zset 的存儲形式與其他幾個void freeZsetObject(robj *o) { zset *zs; switch (o->encoding) { case OBJ_ENCODING_SKIPLIST: zs = o->ptr; // 釋放dict 數(shù)據(jù), ht 0,1 的釋放 dictRelease(zs->dict); // 釋放skiplist 數(shù)據(jù), 主要看下這個 zslFree(zs->zsl); zfree(zs); break; case OBJ_ENCODING_ZIPLIST: zfree(o->ptr); break; default: serverPanic("Unknown sorted set encoding"); }}// t_zset.c, 釋放跳表數(shù)據(jù)/* Free a whole skiplist. */void zslFree(zskiplist *zsl) { zskiplistNode *node = zsl->header->level[0].forward, *next;
        zfree(zsl->header); while(node) { // 基于第0層數(shù)據(jù)釋放,也基于第0層做迭代,直到刪除完成 // 因為其他層數(shù)據(jù)都是引用的第0層的數(shù)據(jù),所以釋放時無需關(guān)注 next = node->level[0].forward; zslFreeNode(node); node = next; } zfree(zsl);}// t_zset 也很簡單,只是把 node.ele 釋放掉,再把自身釋放到即可// 這樣的刪除方式依賴于其存儲結(jié)構(gòu),咱們后續(xù)再聊/* Free the specified skiplist node. The referenced SDS string representation * of the element is freed too, unless node->ele is set to NULL before calling * this function. */void zslFreeNode(zskiplistNode *node) { sdsfree(node->ele); zfree(node);}


        2. 異步刪除過程

        異步刪除按理說會更復(fù)雜,更有意思些。只不過我們前面已經(jīng)把核心的東西擼了個遍,這剩下的也不多了。

        // lazyfree.c, int dbAsyncDelete(redisDb *db, robj *key) {    /* Deleting an entry from the expires dict will not free the sds of     * the key, because it is shared with the main dictionary. */    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
        /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictFind(db->dict,key->ptr); if (de) { robj *val = dictGetVal(de); size_t free_effort = lazyfreeGetFreeEffort(val);
        /* If releasing the object is too much work, let's put it into the * lazy free list. */ // 其實異步方法與同步方法的差別在這,即要求 刪除的元素影響須大于某閥值(64) // 否則按照同步方式直接刪除,因為那樣代價更小 if (free_effort > LAZYFREE_THRESHOLD) { // 異步釋放+1,原子操作 atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex); // 將 value 的釋放添加到異步線程隊列中去,后臺處理, 任務(wù)類型為 異步釋放內(nèi)存 bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); // 設(shè)置val為NULL, 以便在外部進(jìn)行刪除時忽略釋放value相關(guān)內(nèi)存 dictSetVal(db->dict,de,NULL); } }
        /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (dictDelete(db->dict,key->ptr) == DICT_OK) { if (server.cluster_enabled) slotToKeyDel(key); return 1; } else { return 0; }}// bio.c, 添加異步任務(wù)到線程中, 類型由type決定,線程安全地添加// 然后嘛,后臺線程就不會停地運行了任務(wù)了void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job));
        job->time = time(NULL); job->arg1 = arg1; job->arg2 = arg2; job->arg3 = arg3; // 上鎖操作 pthread_mutex_lock(&bio_mutex[type]); listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; // 喚醒任務(wù)線程 pthread_cond_signal(&bio_newjob_cond[type]); pthread_mutex_unlock(&bio_mutex[type]);}// bio.c, 后臺線程任務(wù)框架,總之還是有事情可做了。void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset;
        /* Check that the type is within the right interval. */ if (type >= BIO_NUM_OPS) { serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; }
        /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
        pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); // 任務(wù)一直運行 while(1) { listNode *ln;
        /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { // 注意此處將會釋放鎖喲,以便外部可以添加任務(wù)進(jìn)來 pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]);
        /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { aof_fsync((long)job->arg1); } // 也就是這玩意了,會去處理提交過來的任務(wù) else if (type == BIO_LAZY_FREE) { /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. * arg2 & arg3 -> free two dictionaries (a Redis DB). * only arg3 -> free the skiplist. */ // 本文介紹的刪除value形式,用第一種情況 if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1); else if (job->arg2 && job->arg3) lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread(job->arg3); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job);
        /* Unblock threads blocked on bioWaitStepOfType() if any. */ // 喚醒所有相關(guān)等待線程 pthread_cond_broadcast(&bio_step_cond[type]);
        /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; }}
        // lazyfree.c, 和同步刪除一致了/* Release objects from the lazyfree thread. It's just decrRefCount() * updating the count of objects to release. */void lazyfreeFreeObjectFromBioThread(robj *o) { decrRefCount(o); atomicDecr(lazyfree_objects,1,&lazyfree_objects_mutex);}

        從此處redis異步處理過程,我們可以看到,redis并不是每次進(jìn)入異步時都進(jìn)行異步操作,而是在必要的時候才會進(jìn)行。這也提示我們,不要為了異步而異步,而是應(yīng)該計算利弊。

         如此,整個 del/unlink 的過程就完成了??傮w來說,刪除都是基于hash的簡單remove而已,唯一有點難度是對內(nèi)存的回收問題,這其實就是一個簡單的使用引用計數(shù)器算法實現(xiàn)的垃圾回收器應(yīng)該做的事而已。勿須多言。具體過程需依賴于數(shù)據(jù)的存儲結(jié)構(gòu),主要目的自然是遞歸釋放空間,避免內(nèi)存泄漏了。




        往期精彩推薦



        騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)

        面試:史上最全多線程面試題 !

        最新阿里內(nèi)推Java后端面試題

        JVM難學(xué)?那是因為你沒認(rèn)真看完這篇文章


        END


        關(guān)注作者微信公眾號 —《JAVA爛豬皮》


        了解更多java后端架構(gòu)知識以及最新面試寶典


        你點的每個好看,我都認(rèn)真當(dāng)成了


        看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


        作者:等你歸去來

        出處:https://www.cnblogs.com/yougewe/p/12231419.html

        瀏覽 110
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            全黄h全肉医生短篇禁乱 | 欧美成人aa免费 | 一级黄色AA片 | 三级电影在线看 | 伊人操逼av | 丰满少妇双乳被呻吟进入 | 免费看黄色A片 | 国产一级婬片A片免费妖精视频 | 少妇打野战 | 王老板会所性感美女口爆 |