Redis(七):set/sadd/sismember/sinter/sdiffstore 命令源碼解析
上兩篇我們講了hash和list數(shù)據(jù)類型相關(guān)的主要實(shí)現(xiàn)方法,同時(shí)加上前面對框架服務(wù)和string相關(guān)的功能介紹,已揭開了大部分redis的實(shí)用面紗。
現(xiàn)在還剩下兩種數(shù)據(jù)類型: set, zset.
本篇咱們繼續(xù)來看redis中的數(shù)據(jù)類型的實(shí)現(xiàn): set 相關(guān)操作實(shí)現(xiàn)。
研究過jdk的hashmap和hashset實(shí)現(xiàn)的同學(xué),肯定都是知道,set其實(shí)就是一個(gè)簡化版的map,只要將map的 k->v 的形式變?yōu)?k->1 的形式就可以了。所以set只是map的一個(gè)簡單包裝類。
同理,對于 redis的 hash 和 set 數(shù)據(jù)類型,我們是否可以得出這么個(gè)結(jié)論呢?(如果是那樣的話,我們就只需看幾個(gè)set提供的特殊功能即可)
同樣,我們從功能列表開始,到數(shù)據(jù)結(jié)構(gòu),再到具體實(shí)現(xiàn)的這么個(gè)思路,來探索redis set的實(shí)現(xiàn)吧。
零、redis set相關(guān)操作方法
Redis 的 Set 是 String 類型的無序集合。集合成員是唯一的,這就意味著集合中不能出現(xiàn)重復(fù)的數(shù)據(jù)??筛鶕?jù)應(yīng)用場景需要選用該數(shù)據(jù)類型。(比如:好友/關(guān)注/粉絲/感興趣的人/黑白名單)
從官方的手冊中可以查到相關(guān)的使用方法。
1> SADD key member1 [member2]
功能: 向集合添加一個(gè)或多個(gè)成員
返回值: 本次添加到redis的member數(shù)量(不包含已存在的member)2> SCARD key
功能: 獲取集合的成員數(shù)
返回值: set的元素?cái)?shù)量或者03> SDIFF key1 [key2]
功能: 返回給定所有集合的差集
返回值: 差集的數(shù)組列表4> SDIFFSTORE destination key1 [key2]
功能: 返回給定所有集合的差集并存儲(chǔ)在 destination 中
返回值: 差集元素個(gè)數(shù)5> SINTER key1 [key2]
功能: 返回給定所有集合的交集
返回值: 交集的數(shù)組列表6> SINTERSTORE destination key1 [key2]
功能: 返回給定所有集合的交集并存儲(chǔ)在 destination 中
返回值: 交集的元素個(gè)數(shù)7> SISMEMBER key member
功能: 判斷 member 元素是否是集合 key 的成員
返回值: 1:如果member是key的成員, 0:如果member不是key的成員或者key不存在8> SMEMBERS key
功能: 返回集合中的所有成員
返回值: 所有成員列表9> SMOVE source destination member
功能: 將 member 元素從 source 集合移動(dòng)到 destination 集合
返回值: 1:移動(dòng)操作成功, 0:移動(dòng)不成功(member不是source的成員)10> SPOP key [count]
功能: 移除并返回集合中的一個(gè)隨機(jī)元素(因?yàn)閟et是無序的)
返回值: 被移除的元素列表或者nil11> SRANDMEMBER key [count]
功能: 返回集合中一個(gè)或多個(gè)隨機(jī)數(shù)
返回值: 1個(gè)元素或者count個(gè)元素?cái)?shù)組列表或者nil12> SREM key member1 [member2]
功能: 移除集合中一個(gè)或多個(gè)成員
返回值: 實(shí)際移除的元素個(gè)數(shù)13> SUNION key1 [key2]
功能: 返回所有給定集合的并集
返回值: 并集元素?cái)?shù)組列表14> SUNIONSTORE destination key1 [key2]
功能: 所有給定集合的并集存儲(chǔ)在 destination 集合中
返回值: 并集元素個(gè)數(shù)15> SSCAN key cursor [MATCH pattern] [COUNT count]
功能: 迭代集合中的元素
返回值: 元素?cái)?shù)組列表
一、set 相關(guān)數(shù)據(jù)結(jié)構(gòu)
redis使用dict和intset 兩種數(shù)據(jù)結(jié)構(gòu)保存set數(shù)據(jù)。
// 1. inset 數(shù)據(jù)結(jié)構(gòu),在set數(shù)據(jù)量小且都是整型數(shù)據(jù)時(shí)使用typedef struct intset {// 編碼范圍,由具體存儲(chǔ)值決定uint32_t encoding;// 數(shù)組長度uint32_t length;// 具體存儲(chǔ)元素的容器int8_t contents[];} intset;// 2. dict 相關(guān)數(shù)據(jù)結(jié)構(gòu),即是 hash 的實(shí)現(xiàn)相關(guān)的數(shù)據(jù)結(jié)構(gòu)/* This is our hash table structure. Every dictionary has two of this as we* implement incremental rehashing, for the old to the new table. */typedef struct dictht {dictEntry **table;unsigned long size;unsigned long sizemask;unsigned long used;} dictht;typedef struct dict {dictType *type;void *privdata;dictht ht[2];long rehashidx; /* rehashing not in progress if rehashidx == -1 */unsigned long iterators; /* number of iterators currently running */} dict;/* If safe is set to 1 this is a safe iterator, that means, you can call* dictAdd, dictFind, and other functions against the dictionary even while* iterating. Otherwise it is a non safe iterator, and only dictNext()* should be called while iterating. */typedef struct dictIterator {dict *d;long index;int table, safe;dictEntry *entry, *nextEntry;/* unsafe iterator fingerprint for misuse detection. */long long fingerprint;} dictIterator;typedef struct dictEntry {void *key;union {void *val;uint64_t u64;int64_t s64;double d;} v;struct dictEntry *next;} dictEntry;typedef struct dictType {unsigned int (*hashFunction)(const void *key);void *(*keyDup)(void *privdata, const void *key);void *(*valDup)(void *privdata, const void *obj);int (*keyCompare)(void *privdata, const void *key1, const void *key2);void (*keyDestructor)(void *privdata, void *key);void (*valDestructor)(void *privdata, void *obj);} dictType;
對于set相關(guān)的命令的接口定義:
{"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},{"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},{"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},{"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},{"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},{"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},{"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},{"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},{"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},{"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},{"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},{"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},{"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},{"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
二、sadd 添加成員操作
一般我們都會(huì)以添加數(shù)據(jù)開始。從而理解數(shù)據(jù)結(jié)構(gòu)的應(yīng)用。
// 用法: SADD key member1 [member2]// t_set.c, 添加membervoid saddCommand(client *c) {robj *set;int j, added = 0;// 先從當(dāng)前db中查找set實(shí)例set = lookupKeyWrite(c->db,c->argv[1]);if (set == NULL) {// 1. 新建set實(shí)例并添加到當(dāng)前db中set = setTypeCreate(c->argv[2]->ptr);dbAdd(c->db,c->argv[1],set);} else {if (set->type != OBJ_SET) {addReply(c,shared.wrongtypeerr);return;}}// 對于n個(gè)member,一個(gè)個(gè)地添加即可for (j = 2; j < c->argc; j++) {// 2. 只有添加成功, added 才會(huì)加1if (setTypeAdd(set,c->argv[j]->ptr)) added++;}// 命令傳播if (added) {signalModifiedKey(c->db,c->argv[1]);notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);}server.dirty += added;// 響應(yīng)添加成功的數(shù)量addReplyLongLong(c,added);}// 1. 創(chuàng)建新的set集合實(shí)例(需根據(jù)首次的參數(shù)類型判定)// t_set.c, 創(chuàng)建set實(shí)例/* Factory method to return a set that *can* hold "value". When the object has* an integer-encodable value, an intset will be returned. Otherwise a regular* hash table. */robj *setTypeCreate(sds value) {// 如果傳入的value是整型,則創(chuàng)建 intset 類型的set// 否則使用dict類型的set// 一般地,第一個(gè)數(shù)據(jù)為整型,后續(xù)數(shù)據(jù)也應(yīng)該為整型,所以這個(gè)數(shù)據(jù)結(jié)構(gòu)相對穩(wěn)定// 而hash的容器創(chuàng)建時(shí),只使用了一 ziplist 創(chuàng)建,這是不一樣的實(shí)現(xiàn)if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)return createIntsetObject();return createSetObject();}// 1.1. 創(chuàng)建 intset 型的set// object.crobj *createIntsetObject(void) {intset *is = intsetNew();robj *o = createObject(OBJ_SET,is);o->encoding = OBJ_ENCODING_INTSET;return o;}// intset.c, new一個(gè)空的intset對象/* Create an empty intset. */intset *intsetNew(void) {intset *is = zmalloc(sizeof(intset));is->encoding = intrev32ifbe(INTSET_ENC_INT16);is->length = 0;return is;}// 1.2. 創(chuàng)建dict 型的setrobj *createSetObject(void) {dict *d = dictCreate(&setDictType,NULL);robj *o = createObject(OBJ_SET,d);o->encoding = OBJ_ENCODING_HT;return o;}// dict.c/* Create a new hash table */dict *dictCreate(dictType *type,void *privDataPtr){dict *d = zmalloc(sizeof(*d));_dictInit(d,type,privDataPtr);return d;}/* Initialize the hash table */int _dictInit(dict *d, dictType *type,void *privDataPtr){_dictReset(&d->ht[0]);_dictReset(&d->ht[1]);d->type = type;d->privdata = privDataPtr;d->rehashidx = -1;d->iterators = 0;return DICT_OK;}// 2. 添加member到set集合中// t_set.c, 添加元素/* Add the specified value into a set.** If the value was already member of the set, nothing is done and 0 is* returned, otherwise the new element is added and 1 is returned. */int setTypeAdd(robj *subject, sds value) {long long llval;// 2.1. HT編碼和INTSET編碼分別處理就好if (subject->encoding == OBJ_ENCODING_HT) {dict *ht = subject->ptr;// 以 value 為 key, 添加實(shí)例到ht中// 實(shí)現(xiàn)過程也很簡單,大概就是如果存在則返回NULL(即無需添加),輔助rehash,分配內(nèi)存創(chuàng)建dictEntry實(shí)例,稍后簡單看看dictEntry *de = dictAddRaw(ht,value);if (de) {// 重新設(shè)置key為 sdsdup(value), value為NULLdictSetKey(ht,de,sdsdup(value));dictSetVal(ht,de,NULL);return 1;}}// 2.2. intset 編碼的member添加else if (subject->encoding == OBJ_ENCODING_INTSET) {// 嘗試解析value為 long 型,值寫入 llval 中if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {uint8_t success = 0;// 情況1. 可添加到intset中subject->ptr = intsetAdd(subject->ptr,llval,&success);if (success) {/* Convert to regular set when the intset contains* too many entries. */// 默認(rèn): 512, intset大于之后,則轉(zhuǎn)換為ht hash表模式存儲(chǔ)if (intsetLen(subject->ptr) > server.set_max_intset_entries)// 2.3. 轉(zhuǎn)換intset編碼為 ht 編碼setTypeConvert(subject,OBJ_ENCODING_HT);return 1;}} else {// 情況2. member 是字符串型,先將set容器轉(zhuǎn)換為 ht 編碼,再重新執(zhí)行dict的添加模式/* Failed to get integer from object, convert to regular set. */setTypeConvert(subject,OBJ_ENCODING_HT);/* The set *was* an intset and this value is not integer* encodable, so dictAdd should always work. */serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);return 1;}} else {serverPanic("Unknown set encoding");}return 0;}// 2.1. 添加member到dict中(略解, 在hash數(shù)據(jù)結(jié)構(gòu)解析中已介紹)// dict.c, 添加某key到 d 字典中/* Low level add. This function adds the entry but instead of setting* a value returns the dictEntry structure to the user, that will make* sure to fill the value field as he wishes.** This function is also directly exposed to the user API to be called* mainly in order to store non-pointers inside the hash value, example:** entry = dictAddRaw(dict,mykey);* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);** Return values:** If key already exists NULL is returned.* If key was added, the hash entry is returned to be manipulated by the caller.*/dictEntry *dictAddRaw(dict *d, void *key){int index;dictEntry *entry;dictht *ht;if (dictIsRehashing(d)) _dictRehashStep(d);/* Get the index of the new element, or -1 if* the element already exists. */// 獲取需要添加的key的存放位置下標(biāo)(slot), 如果該key已存在, 則返回-1(無可用slot)if ((index = _dictKeyIndex(d, key)) == -1)return NULL;/* Allocate the memory and store the new entry.* Insert the element in top, with the assumption that in a database* system it is more likely that recently added entries are accessed* more frequently. */ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];entry = zmalloc(sizeof(*entry));entry->next = ht->table[index];ht->table[index] = entry;ht->used++;/* Set the hash entry fields. */dictSetKey(d, entry, key);return entry;}// 2.2. 添加整型數(shù)據(jù)到 intset中// intset.c, 添加value/* Insert an integer in the intset */intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {// 獲取value的所屬范圍uint8_t valenc = _intsetValueEncoding(value);uint32_t pos;if (success) *success = 1;/* Upgrade encoding if necessary. If we need to upgrade, we know that* this value should be either appended (if > 0) or prepended (if < 0),* because it lies outside the range of existing values. */// 默認(rèn) is->encoding 為 INTSET_ENC_INT16 (16位長)// 2.2.1. 即超過當(dāng)前預(yù)設(shè)的位長,則需要增大預(yù)設(shè),然后添加// 此時(shí)的value可以確定: 要么是最大,要么是最小 (所以我們可以推斷,此intset應(yīng)該是有序的)if (valenc > intrev32ifbe(is->encoding)) {/* This always succeeds, so we don't need to curry *success. */return intsetUpgradeAndAdd(is,value);} else {/* Abort if the value is already present in the set.* This call will populate "pos" with the right position to insert* the value when it cannot be found. */// 2.2.2. 在當(dāng)前環(huán)境下添加value// 找到value則說明元素已存在,不可再添加// pos 保存比value小的第1個(gè)元素的位置if (intsetSearch(is,value,&pos)) {if (success) *success = 0;return is;}is = intsetResize(is,intrev32ifbe(is->length)+1);// 在pos不是末尾位置時(shí),需要留出空位,依次移動(dòng)后面的元素if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);}// 針對編碼位不變更的情況下設(shè)置pos位置的值_intsetSet(is,pos,value);is->length = intrev32ifbe(intrev32ifbe(is->length)+1);return is;}// 判斷 value 的位長// INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64// 2 < 4 < 8/* Return the required encoding for the provided value. */static uint8_t _intsetValueEncoding(int64_t v) {if (v < INT32_MIN || v > INT32_MAX)return INTSET_ENC_INT64;else if (v < INT16_MIN || v > INT16_MAX)return INTSET_ENC_INT32;elsereturn INTSET_ENC_INT16;}// 2.2.1. 升級(jí)預(yù)設(shè)位長,并添加value// intset.c/* Upgrades the intset to a larger encoding and inserts the given integer. */static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {uint8_t curenc = intrev32ifbe(is->encoding);uint8_t newenc = _intsetValueEncoding(value);int length = intrev32ifbe(is->length);int prepend = value < 0 ? 1 : 0;/* First set new encoding and resize */is->encoding = intrev32ifbe(newenc);// 每次必進(jìn)行擴(kuò)容is = intsetResize(is,intrev32ifbe(is->length)+1);/* Upgrade back-to-front so we don't overwrite values.* Note that the "prepend" variable is used to make sure we have an empty* space at either the beginning or the end of the intset. */// 因編碼發(fā)生變化,元素的位置已經(jīng)不能一一對應(yīng),需要按照原來的編碼依次轉(zhuǎn)移過來// 從后往前依次賦值,所以,內(nèi)存位置上不存在覆蓋問題(后面內(nèi)存位置一定是空的),直接依次賦值即可(高效復(fù)制)while(length--)_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));/* Set the value at the beginning or the end. */// 對新增加的元素,負(fù)數(shù)添加到第0位,否則添加到最后一個(gè)元素后一位if (prepend)_intsetSet(is,0,value);else_intsetSet(is,intrev32ifbe(is->length),value);is->length = intrev32ifbe(intrev32ifbe(is->length)+1);return is;}/* Resize the intset */static intset *intsetResize(intset *is, uint32_t len) {uint32_t size = len*intrev32ifbe(is->encoding);// mallocis = zrealloc(is,sizeof(intset)+size);return is;}// intset.c, 獲取pos位置的值/* Return the value at pos, given an encoding. */static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {int64_t v64;int32_t v32;int16_t v16;if (enc == INTSET_ENC_INT64) {memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));memrev64ifbe(&v64);return v64;} else if (enc == INTSET_ENC_INT32) {memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));memrev32ifbe(&v32);return v32;} else {memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));memrev16ifbe(&v16);return v16;}}// intset.c, 設(shè)置pos位置的值,和數(shù)組賦值的實(shí)際意義差不多// 只是這里數(shù)據(jù)類型是不確定的,所以使用指針進(jìn)行賦值/* Set the value at pos, using the configured encoding. */static void _intsetSet(intset *is, int pos, int64_t value) {uint32_t encoding = intrev32ifbe(is->encoding);if (encoding == INTSET_ENC_INT64) {((int64_t*)is->contents)[pos] = value;memrev64ifbe(((int64_t*)is->contents)+pos);} else if (encoding == INTSET_ENC_INT32) {((int32_t*)is->contents)[pos] = value;memrev32ifbe(((int32_t*)is->contents)+pos);} else {((int16_t*)is->contents)[pos] = value;memrev16ifbe(((int16_t*)is->contents)+pos);}}// 2.2.2. 在編碼類型未變更的情況,需要查找可以存放value的位置(為了確認(rèn)該value是否已存在,以及小于value的第一個(gè)位置賦值)/* Search for the position of "value". Return 1 when the value was found and* sets "pos" to the position of the value within the intset. Return 0 when* the value is not present in the intset and sets "pos" to the position* where "value" can be inserted. */static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;int64_t cur = -1;/* The value can never be found when the set is empty */if (intrev32ifbe(is->length) == 0) {if (pos) *pos = 0;return 0;} else {/* Check for the case where we know we cannot find the value,* but do know the insert position. */// 因 intset 是有序數(shù)組,即可以判定是否超出范圍,如果超出則元素必定不存在if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {if (pos) *pos = intrev32ifbe(is->length);return 0;} else if (value < _intsetGet(is,0)) {if (pos) *pos = 0;return 0;}}// 使用二分查找while(max >= min) {mid = ((unsigned int)min + (unsigned int)max) >> 1;cur = _intsetGet(is,mid);if (value > cur) {min = mid+1;} else if (value < cur) {max = mid-1;} else {// 找到了break;}}if (value == cur) {if (pos) *pos = mid;return 1;} else {// 在沒有找到的情況下,min就是第一個(gè)比 value 小的元素if (pos) *pos = min;return 0;}}// intset移動(dòng)(內(nèi)存移動(dòng))static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {void *src, *dst;uint32_t bytes = intrev32ifbe(is->length)-from;uint32_t encoding = intrev32ifbe(is->encoding);if (encoding == INTSET_ENC_INT64) {src = (int64_t*)is->contents+from;dst = (int64_t*)is->contents+to;bytes *= sizeof(int64_t);} else if (encoding == INTSET_ENC_INT32) {src = (int32_t*)is->contents+from;dst = (int32_t*)is->contents+to;bytes *= sizeof(int32_t);} else {src = (int16_t*)is->contents+from;dst = (int16_t*)is->contents+to;bytes *= sizeof(int16_t);}memmove(dst,src,bytes);}// 2.3. 轉(zhuǎn)換intset編碼為 ht 編碼 (如果遇到string型的value或者intset數(shù)量大于閥值(默認(rèn):512)時(shí))// t_set.c, 類型轉(zhuǎn)換/* Convert the set to specified encoding. The resulting dict (when converting* to a hash table) is presized to hold the number of elements in the original* set. */void setTypeConvert(robj *setobj, int enc) {setTypeIterator *si;// 要求外部必須保證 set類型且 intset 編碼serverAssertWithInfo(NULL,setobj,setobj->type == OBJ_SET &&setobj->encoding == OBJ_ENCODING_INTSET);if (enc == OBJ_ENCODING_HT) {int64_t intele;// 直接創(chuàng)建一個(gè) dict 來容納數(shù)據(jù)dict *d = dictCreate(&setDictType,NULL);sds element;/* Presize the dict to avoid rehashing */// 直接一次性擴(kuò)容成需要的大小dictExpand(d,intsetLen(setobj->ptr));/* To add the elements we extract integers and create redis objects */// setTypeIterator 迭代器是轉(zhuǎn)換的關(guān)鍵si = setTypeInitIterator(setobj);while (setTypeNext(si,&element,&intele) != -1) {// element:ht編碼時(shí)的key, intele: intset編碼時(shí)的valueelement = sdsfromlonglong(intele);// 因set特性保證是無重復(fù)元素,所以添加dict時(shí),必然應(yīng)成功// 此處應(yīng)無 rehash, 而是直接計(jì)算 hashCode, 放置元素, 時(shí)間復(fù)雜度 O(1)serverAssert(dictAdd(d,element,NULL) == DICT_OK);}// 釋放迭代器setTypeReleaseIterator(si);setobj->encoding = OBJ_ENCODING_HT;zfree(setobj->ptr);setobj->ptr = d;} else {serverPanic("Unsupported set conversion");}}// t_set.c, 獲取set集合的迭代器setTypeIterator *setTypeInitIterator(robj *subject) {setTypeIterator *si = zmalloc(sizeof(setTypeIterator));// 設(shè)置迭代器公用信息si->subject = subject;si->encoding = subject->encoding;// hash表則需要再迭代 dictif (si->encoding == OBJ_ENCODING_HT) {si->di = dictGetIterator(subject->ptr);}// intset 比較簡單,直接設(shè)置下標(biāo)即可else if (si->encoding == OBJ_ENCODING_INTSET) {si->ii = 0;} else {serverPanic("Unknown set encoding");}return si;}// dict.c, dict迭代器初始化dictIterator *dictGetIterator(dict *d){dictIterator *iter = zmalloc(sizeof(*iter));iter->d = d;iter->table = 0;iter->index = -1;iter->safe = 0;iter->entry = NULL;iter->nextEntry = NULL;return iter;}// t_set.c,/* Move to the next entry in the set. Returns the object at the current* position.** Since set elements can be internally be stored as SDS strings or* simple arrays of integers, setTypeNext returns the encoding of the* set object you are iterating, and will populate the appropriate pointer* (sdsele) or (llele) accordingly.** Note that both the sdsele and llele pointers should be passed and cannot* be NULL since the function will try to defensively populate the non* used field with values which are easy to trap if misused.** When there are no longer elements -1 is returned. */int setTypeNext(setTypeIterator *si, sds *sdsele, int64_t *llele) {// hash表返回keyif (si->encoding == OBJ_ENCODING_HT) {dictEntry *de = dictNext(si->di);if (de == NULL) return -1;*sdsele = dictGetKey(de);*llele = -123456789; /* Not needed. Defensive. */}// intset 直接獲取下標(biāo)對應(yīng)的元素即可else if (si->encoding == OBJ_ENCODING_INTSET) {if (!intsetGet(si->subject->ptr,si->ii++,llele))return -1;*sdsele = NULL; /* Not needed. Defensive. */} else {serverPanic("Wrong set encoding in setTypeNext");}return si->encoding;}// case1: intset直接疊加下標(biāo)即可// intset.c/* Sets the value to the value at the given position. When this position is* out of range the function returns 0, when in range it returns 1. */uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value) {if (pos < intrev32ifbe(is->length)) {*value = _intsetGet(is,pos);return 1;}return 0;}/* Return the value at pos, using the configured encoding. */static int64_t _intsetGet(intset *is, int pos) {return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));}/* Return the value at pos, given an encoding. */static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {int64_t v64;int32_t v32;int16_t v16;if (enc == INTSET_ENC_INT64) {memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));memrev64ifbe(&v64);return v64;} else if (enc == INTSET_ENC_INT32) {memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));memrev32ifbe(&v32);return v32;} else {memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));memrev16ifbe(&v16);return v16;}}// (附帶)case2: dict的迭代// dict.c, dict的迭代,存疑問dictEntry *dictNext(dictIterator *iter){// 一直迭代查找while (1) {// iter->entry 為NULL, 有兩種可能: 1. 初始化時(shí); 2. 上一元素為迭代完成(hash沖突)if (iter->entry == NULL) {dictht *ht = &iter->d->ht[iter->table];if (iter->index == -1 && iter->table == 0) {if (iter->safe)iter->d->iterators++;elseiter->fingerprint = dictFingerprint(iter->d);}// 直接使用下標(biāo)進(jìn)行迭代,如果中間有空閑位置該如何處理??// 看起來redis是使用了全量迭代元素的處理辦法,即有可能有許多空迭代過程// 一般地,也是進(jìn)行兩層迭代,jdk的hashmap迭代實(shí)現(xiàn)為直接找到下一次非空的元素為止iter->index++;// 直到迭代完成所有元素,否則會(huì)直到找到一個(gè)元素為止if (iter->index >= (long) ht->size) {if (dictIsRehashing(iter->d) && iter->table == 0) {iter->table++;iter->index = 0;ht = &iter->d->ht[1];} else {break;}}iter->entry = ht->table[iter->index];} else {// entry不為空,就一定有nextEntry??iter->entry = iter->nextEntry;}// 如果當(dāng)前entry為空,則繼續(xù)迭代下一個(gè) indexif (iter->entry) {/* We need to save the 'next' here, the iterator user* may delete the entry we are returning. */iter->nextEntry = iter->entry->next;return iter->entry;}}return NULL;}
其實(shí)sadd過程非常簡單。與hash的實(shí)現(xiàn)方式只是在 dict 上的操作是一致的,但本質(zhì)上是不一樣的。我們通過一個(gè)時(shí)序圖整體看一下:

三、sismember 元素查找操作
由于set本身的特性決定,它不會(huì)有許多查詢功能也沒必要提供豐富的查詢功用。所以只能先挑這個(gè)來看看了。要確定一個(gè)元素是不是其成員,無非就是一個(gè)比較的過程。
// 用法: SISMEMBER key member// t_set.c,void sismemberCommand(client *c) {robj *set;if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||checkType(c,set,OBJ_SET)) return;// 主要方法 setTypeIsMemberif (setTypeIsMember(set,c->argv[2]->ptr))// 回復(fù)1addReply(c,shared.cone);else// 回復(fù)0addReply(c,shared.czero);}// t_set.cint setTypeIsMember(robj *subject, sds value) {long long llval;if (subject->encoding == OBJ_ENCODING_HT) {// hash 表的查找方式,hashCode 計(jì)算,鏈表查找,就這么簡單return dictFind((dict*)subject->ptr,value) != NULL;} else if (subject->encoding == OBJ_ENCODING_INTSET) {// 如果當(dāng)前的set集合是 intset 編碼的,則只有查找值也是整型的情況下才可能查找到元素if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {// intset 查找,而且 intset 是有序的,所以直接使用二分查找即可return intsetFind((intset*)subject->ptr,llval);}} else {serverPanic("Unknown set encoding");}return 0;}/* Determine whether a value belongs to this set */uint8_t intsetFind(intset *is, int64_t value) {uint8_t valenc = _intsetValueEncoding(value);// 最大范圍檢查,加二分查找// intsetSearch 前面已介紹return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,NULL);}
四、sinter 集合交集獲取
兩個(gè)set的數(shù)據(jù)集取交集,也是要看使用場景吧。(比如獲取共同的好友)
在看redis的實(shí)現(xiàn)之前,我們可以自己先想想,如何實(shí)現(xiàn)兩個(gè)集合次問題?(算法題)我只能想到無腦地兩重迭代加hash的方式。你呢?
// 用法: SINTER key1 [key2]// t_set.c, sinter 實(shí)現(xiàn)void sinterCommand(client *c) {// 第三個(gè)參數(shù)是用來存儲(chǔ) 交集結(jié)果的,兩段代碼已做復(fù)用,說明存儲(chǔ)過程還是比較簡單的sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);}// t_set.c, 求n個(gè)key的集合交集void sinterGenericCommand(client *c, robj **setkeys,unsigned long setnum, robj *dstkey) {robj **sets = zmalloc(sizeof(robj*)*setnum);setTypeIterator *si;robj *dstset = NULL;sds elesds;int64_t intobj;void *replylen = NULL;unsigned long j, cardinality = 0;int encoding;for (j = 0; j < setnum; j++) {// 依次查找每個(gè)key的set實(shí)例robj *setobj = dstkey ?lookupKeyWrite(c->db,setkeys[j]) :lookupKeyRead(c->db,setkeys[j]);// 只要有一個(gè)set為空,則交集必定為為,無需再找if (!setobj) {zfree(sets);if (dstkey) {// 沒有交集,直接將dstKey 刪除,注意此邏輯??if (dbDelete(c->db,dstkey)) {signalModifiedKey(c->db,dstkey);server.dirty++;}addReply(c,shared.czero);} else {addReply(c,shared.emptymultibulk);}return;}if (checkType(c,setobj,OBJ_SET)) {zfree(sets);return;}sets[j] = setobj;}/* Sort sets from the smallest to largest, this will improve our* algorithm's performance */// 快速排序算法,將 sets 按照元素長度做排序,使最少元素的set排在最前面qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);/* The first thing we should output is the total number of elements...* since this is a multi-bulk write, but at this stage we don't know* the intersection set size, so we use a trick, append an empty object* to the output list and save the pointer to later modify it with the* right length */if (!dstkey) {replylen = addDeferredMultiBulkLength(c);} else {/* If we have a target key where to store the resulting set* create this key with an empty set inside */dstset = createIntsetObject();}/* Iterate all the elements of the first (smallest) set, and test* the element against all the other sets, if at least one set does* not include the element it is discarded */// 看來redis也是直接通過迭代的方式來完成交集功能// 迭代最少的set集合,依次查找后續(xù)的set集合,當(dāng)遇到一個(gè)不存在的set時(shí),上值被排除,否則是交集si = setTypeInitIterator(sets[0]);while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {for (j = 1; j < setnum; j++) {if (sets[j] == sets[0]) continue;// 以下是查找過程// 分 hash表查找 和 intset 編碼查找if (encoding == OBJ_ENCODING_INTSET) {/* intset with intset is simple... and fast */// 兩個(gè)集合都是 intset 編碼,直接二分查找即可if (sets[j]->encoding == OBJ_ENCODING_INTSET &&!intsetFind((intset*)sets[j]->ptr,intobj)){break;/* in order to compare an integer with an object we* have to use the generic function, creating an object* for this */} else if (sets[j]->encoding == OBJ_ENCODING_HT) {// 編碼不一致,但元素可能相同// setTypeIsMember 復(fù)用前面的代碼,直接查找即可elesds = sdsfromlonglong(intobj);if (!setTypeIsMember(sets[j],elesds)) {sdsfree(elesds);break;}sdsfree(elesds);}} else if (encoding == OBJ_ENCODING_HT) {if (!setTypeIsMember(sets[j],elesds)) {break;}}}/* Only take action when all sets contain the member */// 當(dāng)?shù)晁屑希f明每個(gè)set中都存在該值,是交集(注意分析最后一個(gè)迭代)if (j == setnum) {// 不存儲(chǔ)交集的情況下,直接響應(yīng)元素值即可if (!dstkey) {if (encoding == OBJ_ENCODING_HT)addReplyBulkCBuffer(c,elesds,sdslen(elesds));elseaddReplyBulkLongLong(c,intobj);cardinality++;}// 要存儲(chǔ)交集數(shù)據(jù),將值存儲(chǔ)到 dstset 中else {if (encoding == OBJ_ENCODING_INTSET) {elesds = sdsfromlonglong(intobj);setTypeAdd(dstset,elesds);sdsfree(elesds);} else {setTypeAdd(dstset,elesds);}}}}setTypeReleaseIterator(si);if (dstkey) {/* Store the resulting set into the target, if the intersection* is not an empty set. */// 存儲(chǔ)集合之前會(huì)先把原來的數(shù)據(jù)刪除,如果進(jìn)行多次交集運(yùn)算,dstKey 就相當(dāng)于臨時(shí)表咯int deleted = dbDelete(c->db,dstkey);if (setTypeSize(dstset) > 0) {dbAdd(c->db,dstkey,dstset);addReplyLongLong(c,setTypeSize(dstset));notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",dstkey,c->db->id);} else {decrRefCount(dstset);addReply(c,shared.czero);if (deleted)notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);}signalModifiedKey(c->db,dstkey);server.dirty++;} else {setDeferredMultiBulkLength(c,replylen,cardinality);}zfree(sets);}// compare 方法int qsortCompareSetsByCardinality(const void *s1, const void *s2) {return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2);}// 快排樣例 sort.lua-- extracted from Programming Pearls, page 110function qsort(x,l,u,f)if l<u thenlocal m=math.random(u-(l-1))+l-1 -- choose a random pivot in range l..ux[l],x[m]=x[m],x[l] -- swap pivot to first positionlocal t=x[l] -- pivot valuem=llocal i=l+1while i<=u do-- invariant: x[l+1..m] < t <= x[m+1..i-1]if f(x[i],t) thenm=m+1x[m],x[i]=x[i],x[m] -- swap x[i] and x[m]endi=i+1endx[l],x[m]=x[m],x[l] -- swap pivot to a valid place-- x[l+1..m-1] < x[m] <= x[m+1..u]qsort(x,l,m-1,f)qsort(x,m+1,u,f)endend
sinter 看起來就是一個(gè)算法題嘛。
五、sdiffstore 差集處理
sinter交集是一算法題,那么sdiff差集應(yīng)該也就是一道算法題而已。確認(rèn)下:
// 用法: SDIFFSTORE destination key1 [key2]// t_set.cvoid sdiffstoreCommand(client *c) {// 看起來sdiff 與 sunion 共用了一段代碼,為啥呢?// 想想 sql 中的 full join// c->argv[1] 是 dstKeysunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF);}// t_set.c, 差集并集運(yùn)算void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,robj *dstkey, int op) {robj **sets = zmalloc(sizeof(robj*)*setnum);setTypeIterator *si;robj *dstset = NULL;sds ele;int j, cardinality = 0;int diff_algo = 1;// 同樣的套路,先查找各key的實(shí)例// 不同的是,這里的key允許不存在,但不允許類型不一致for (j = 0; j < setnum; j++) {robj *setobj = dstkey ?lookupKeyWrite(c->db,setkeys[j]) :lookupKeyRead(c->db,setkeys[j]);if (!setobj) {sets[j] = NULL;continue;}if (checkType(c,setobj,OBJ_SET)) {zfree(sets);return;}sets[j] = setobj;}/* Select what DIFF algorithm to use.** Algorithm 1 is O(N*M) where N is the size of the element first set* and M the total number of sets.** Algorithm 2 is O(N) where N is the total number of elements in all* the sets.** We compute what is the best bet with the current input here. */// 針對差集運(yùn)算,做算法優(yōu)化if (op == SET_OP_DIFF && sets[0]) {long long algo_one_work = 0, algo_two_work = 0;for (j = 0; j < setnum; j++) {if (sets[j] == NULL) continue;algo_one_work += setTypeSize(sets[0]);algo_two_work += setTypeSize(sets[j]);}/* Algorithm 1 has better constant times and performs less operations* if there are elements in common. Give it some advantage. */algo_one_work /= 2;diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;if (diff_algo == 1 && setnum > 1) {/* With algorithm 1 it is better to order the sets to subtract* by decreasing size, so that we are more likely to find* duplicated elements ASAP. */qsort(sets+1,setnum-1,sizeof(robj*),qsortCompareSetsByRevCardinality);}}/* We need a temp set object to store our union. If the dstkey* is not NULL (that is, we are inside an SUNIONSTORE operation) then* this set object will be the resulting object to set into the target key*/dstset = createIntsetObject();if (op == SET_OP_UNION) {/* Union is trivial, just add every element of every set to the* temporary set. */for (j = 0; j < setnum; j++) {if (!sets[j]) continue; /* non existing keys are like empty sets */// 依次添加即可,對于 sunion 來說,有序是無意義的si = setTypeInitIterator(sets[j]);while((ele = setTypeNextObject(si)) != NULL) {if (setTypeAdd(dstset,ele)) cardinality++;sdsfree(ele);}setTypeReleaseIterator(si);}}// 使用算法1, 依次迭代最大元素else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {/* DIFF Algorithm 1:** We perform the diff by iterating all the elements of the first set,* and only adding it to the target set if the element does not exist* into all the other sets.** This way we perform at max N*M operations, where N is the size of* the first set, and M the number of sets. */si = setTypeInitIterator(sets[0]);while((ele = setTypeNextObject(si)) != NULL) {for (j = 1; j < setnum; j++) {if (!sets[j]) continue; /* no key is an empty set. */if (sets[j] == sets[0]) break; /* same set! */// 只要有一個(gè)相同,就不算是差集??if (setTypeIsMember(sets[j],ele)) break;}// 這里的差集是所有set的值都不相同或者為空???尷尬了if (j == setnum) {/* There is no other set with this element. Add it. */setTypeAdd(dstset,ele);cardinality++;}sdsfree(ele);}setTypeReleaseIterator(si);}// 使用算法2,直接以第一個(gè)元素為基礎(chǔ),后續(xù)set做remove,最后剩下的就是差集else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {/* DIFF Algorithm 2:** Add all the elements of the first set to the auxiliary set.* Then remove all the elements of all the next sets from it.** This is O(N) where N is the sum of all the elements in every* set. */for (j = 0; j < setnum; j++) {if (!sets[j]) continue; /* non existing keys are like empty sets */si = setTypeInitIterator(sets[j]);while((ele = setTypeNextObject(si)) != NULL) {if (j == 0) {if (setTypeAdd(dstset,ele)) cardinality++;} else {if (setTypeRemove(dstset,ele)) cardinality--;}sdsfree(ele);}setTypeReleaseIterator(si);/* Exit if result set is empty as any additional removal* of elements will have no effect. */if (cardinality == 0) break;}}/* Output the content of the resulting set, if not in STORE mode */if (!dstkey) {addReplyMultiBulkLen(c,cardinality);si = setTypeInitIterator(dstset);// 響應(yīng)差集列表while((ele = setTypeNextObject(si)) != NULL) {addReplyBulkCBuffer(c,ele,sdslen(ele));sdsfree(ele);}setTypeReleaseIterator(si);decrRefCount(dstset);} else {/* If we have a target key where to store the resulting set* create this key with the result set inside */int deleted = dbDelete(c->db,dstkey);if (setTypeSize(dstset) > 0) {// 存儲(chǔ)差集列表,響應(yīng)差集個(gè)數(shù)dbAdd(c->db,dstkey,dstset);addReplyLongLong(c,setTypeSize(dstset));notifyKeyspaceEvent(NOTIFY_SET,op == SET_OP_UNION ? "sunionstore" : "sdiffstore",dstkey,c->db->id);} else {decrRefCount(dstset);addReply(c,shared.czero);if (deleted)notifyKeyspaceEvent(NOTIFY_GENERIC,"del",dstkey,c->db->id);}signalModifiedKey(c->db,dstkey);server.dirty++;}zfree(sets);}/* This is used by SDIFF and in this case we can receive NULL that should* be handled as empty sets. */int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0);}
額,這個(gè)差集的定義好像過于簡單了,以至于實(shí)現(xiàn)都不復(fù)雜。
六、spop 獲取一個(gè)元素
前面講的基本都是增、查,雖然不存在改,但是還是可以簡單看一下刪掉操作。spop有兩個(gè)作用,一、獲取1或n個(gè)元素,二、刪除1或n個(gè)元素。
// 用法: SPOP key [count]// t_set.cvoid spopCommand(client *c) {robj *set, *ele, *aux;sds sdsele;int64_t llele;int encoding;if (c->argc == 3) {// 彈出指定數(shù)量的元素,略spopWithCountCommand(c);return;} else if (c->argc > 3) {addReply(c,shared.syntaxerr);return;}/* Make sure a key with the name inputted exists, and that it's type is* indeed a set */if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||checkType(c,set,OBJ_SET)) return;/* Get a random element from the set */// 1. 隨機(jī)獲取一個(gè)元素,這是 spop 的定義encoding = setTypeRandomElement(set,&sdsele,&llele);/* Remove the element from the set */// 2. 刪除元素if (encoding == OBJ_ENCODING_INTSET) {ele = createStringObjectFromLongLong(llele);set->ptr = intsetRemove(set->ptr,llele,NULL);} else {ele = createStringObject(sdsele,sdslen(sdsele));setTypeRemove(set,ele->ptr);}notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);/* Replicate/AOF this command as an SREM operation */aux = createStringObject("SREM",4);rewriteClientCommandVector(c,3,aux,c->argv[1],ele);decrRefCount(aux);/* Add the element to the reply */addReplyBulk(c,ele);decrRefCount(ele);/* Delete the set if it's empty */if (setTypeSize(set) == 0) {dbDelete(c->db,c->argv[1]);notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);}/* Set has been modified */signalModifiedKey(c->db,c->argv[1]);server.dirty++;}// 沒啥好說的,就看下是如何隨機(jī)的就好了// t_set.c, 隨機(jī)獲取一個(gè)元素,賦值給 sdsele|llele/* Return random element from a non empty set.* The returned element can be a int64_t value if the set is encoded* as an "intset" blob of integers, or an SDS string if the set* is a regular set.** The caller provides both pointers to be populated with the right* object. The return value of the function is the object->encoding* field of the object and is used by the caller to check if the* int64_t pointer or the redis object pointer was populated.** Note that both the sdsele and llele pointers should be passed and cannot* be NULL since the function will try to defensively populate the non* used field with values which are easy to trap if misused. */int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele) {if (setobj->encoding == OBJ_ENCODING_HT) {// 1.1. dict 型的隨機(jī)dictEntry *de = dictGetRandomKey(setobj->ptr);*sdsele = dictGetKey(de);*llele = -123456789; /* Not needed. Defensive. */} else if (setobj->encoding == OBJ_ENCODING_INTSET) {// 1.2. intset 型的隨機(jī)*llele = intsetRandom(setobj->ptr);*sdsele = NULL; /* Not needed. Defensive. */} else {serverPanic("Unknown set encoding");}return setobj->encoding;}// 1.1. dict 型的隨機(jī)/* Return a random entry from the hash table. Useful to* implement randomized algorithms */dictEntry *dictGetRandomKey(dict *d){dictEntry *he, *orighe;unsigned int h;int listlen, listele;if (dictSize(d) == 0) return NULL;if (dictIsRehashing(d)) _dictRehashStep(d);// 基本原理就是一直接隨機(jī)獲取下標(biāo),直到有值if (dictIsRehashing(d)) {do {/* We are sure there are no elements in indexes from 0* to rehashidx-1 */// 獲取隨機(jī)下標(biāo),須保證在 兩個(gè)hash表的范圍內(nèi)h = d->rehashidx + (random() % (d->ht[0].size +d->ht[1].size -d->rehashidx));he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] :d->ht[0].table[h];} while(he == NULL);} else {do {h = random() & d->ht[0].sizemask;he = d->ht[0].table[h];} while(he == NULL);}/* Now we found a non empty bucket, but it is a linked* list and we need to get a random element from the list.* The only sane way to do so is counting the elements and* select a random index. */listlen = 0;orighe = he;// 對于hash沖突情況,再隨機(jī)一次while(he) {he = he->next;listlen++;}listele = random() % listlen;he = orighe;while(listele--) he = he->next;return he;}// 1.2. intset 型的隨機(jī)// intset.c/* Return random member */int64_t intsetRandom(intset *is) {// 這個(gè)隨機(jī)就簡單了,直接獲取隨機(jī)下標(biāo),因?yàn)閕ntset可以保證自身元素的完整性return _intsetGet(is,rand()%intrev32ifbe(is->length));}
OK, 至此,整個(gè)set數(shù)據(jù)結(jié)構(gòu)的解析算是完整了。
總體來說,set和hash類型的實(shí)現(xiàn)方式還是有很多不同的。不過沒啥大難度,就是幾個(gè)算法題解罷了。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/12247580.html
