1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        VictorialMetrics存儲(chǔ)原理之索引存儲(chǔ)格式

        共 12396字,需瀏覽 25分鐘

         ·

        2022-06-10 02:49

        前文我們介紹了當(dāng)插入數(shù)據(jù)的時(shí)候會(huì)先去添加索引數(shù)據(jù),索引構(gòu)建完成后又是如何去持久化數(shù)據(jù)的呢?保存的數(shù)據(jù)又是怎樣的格式呢?本節(jié)我們將對(duì)此進(jìn)行詳細(xì)講解。

        添加索引數(shù)據(jù)

        索引構(gòu)建完成后會(huì)調(diào)用 AddItems 函數(shù)將索引添加到 Table 中去:

        //?lib/mergeset/table.go
        //?AddItems?添加指定的?items?到?table?中去
        func?(tb?*Table)?AddItems(items?[][]byte)?error?{??
        ???if?err?:=?tb.rawItems.addItems(tb,?items);?err?!=?nil?{??
        ??????return?fmt.Errorf("cannot?insert?data?into?%q:?%w",?tb.path,?err)??
        ???}??
        ???return?nil??
        }

        Table 的結(jié)構(gòu)如下所示:

        //?lib/mergeset/table.go
        //?Table?代表?mergeset?table.??
        type?Table?struct?{??
        ???activeMerges???uint64??
        ???mergesCount????uint64??
        ???itemsMerged????uint64??
        ???assistedMerges?uint64??

        ???//?merge?索引
        ???mergeIdx?uint64??
        ???//?路徑
        ???path?string??

        ???//?flush回調(diào)
        ???flushCallback?????????func()??
        ???flushCallbackWorkerWG?sync.WaitGroup??
        ???needFlushCallbackCall?uint32??
        ???//?在將指定項(xiàng)的整個(gè)塊刷新到持久存儲(chǔ)之前,在合并期間調(diào)用的回調(diào)
        ???prepareBlock?PrepareBlockCallback??

        ???//?parts?列表
        ???partsLock?sync.Mutex??
        ???parts?????[]*partWrapper??
        ??
        ???//?rawItems?包含最近添加的尚未轉(zhuǎn)換為?parts?的數(shù)據(jù)
        ???//?出于性能原因,未在搜索中使用?rawItems
        ???rawItems?rawItemsShards??
        ??
        ???snapshotLock?sync.RWMutex??
        ??
        ???flockF?*os.File??
        ??
        ???stopCh?chan?struct{}??
        ??
        ???partMergersWG?syncwg.WaitGroup??
        ???rawItemsFlusherWG?sync.WaitGroup??
        ???convertersWG?sync.WaitGroup??
        ???rawItemsPendingFlushesWG?syncwg.WaitGroup??
        }

        一個(gè)索引 Table 就對(duì)應(yīng)著一個(gè) indexDB,也就是數(shù)據(jù)目錄 indexdb 下面的文件夾:

        其中核心的是 partsrawItems 兩個(gè)屬性。

        • parts 主要是存儲(chǔ) merge 后的 blocks,一個(gè) part 與文件系統(tǒng)上的一個(gè)目錄對(duì)應(yīng),比如上圖中的 24_1_16F4A862471C1DC9 目錄就是一個(gè) part。
        • rawItems 是用于預(yù)處理 Items 的,是一個(gè) rawItemsShards 對(duì)象。

        rawItemsShards 結(jié)構(gòu)體定義如下所示:

        //?lib/mergeset/table.go
        type?rawItemsShards?struct?{??
        ???shardIdx?uint32??
        ??
        ???//?在多?cpu?系統(tǒng)上添加?rows?數(shù)據(jù)時(shí),shards?分片可以減少鎖競(jìng)爭(zhēng)?
        ???shards?[]rawItemsShard??
        }

        //?每個(gè)?table?的?rawItems?分片數(shù)?
        var?rawItemsShardsPerTable?=?cgroup.AvailableCPUs()??

        //?每個(gè)分片最大的Block數(shù)
        const?maxBlocksPerShard?=?512

        //?當(dāng)在打開Table的時(shí)候就會(huì)調(diào)用該函數(shù)進(jìn)行初始化
        func?(riss?*rawItemsShards)?init()?{??
        ???riss.shards?=?make([]rawItemsShard,?rawItemsShardsPerTable)??
        }

        //?添加?items?元素
        func?(riss?*rawItemsShards)?addItems(tb?*Table,?items?[][]byte)?error?{??
        ???n?:=?atomic.AddUint32(&riss.shardIdx,?1)??
        ???shards?:=?riss.shards??
        ???idx?:=?n?%?uint32(len(shards))??
        ???shard?:=?&shards[idx]??
        ???return?shard.addItems(tb,?items)??
        }

        rawItemsShards 其實(shí)就是加了一個(gè)分片功能用于保存索引數(shù)據(jù),addItems 函數(shù)就是將要添加的數(shù)據(jù)添加到對(duì)應(yīng)的分片上去,最終執(zhí)行的邏輯是 shard.addItems。

        //?lib/mergeset/table.go
        type?rawItemsShard?struct?{??
        ???mu????????????sync.Mutex??
        ???ibs???????????[]*inmemoryBlock??
        ???lastFlushTime?uint64??
        }??

        //?添加items元素
        func?(ris?*rawItemsShard)?addItems(tb?*Table,?items?[][]byte)?error?{??
        ???var?err?error??
        ???var?blocksToFlush?[]*inmemoryBlock??
        ??
        ???ris.mu.Lock()??
        ???ibs?:=?ris.ibs??
        ???if?len(ibs)?==?0?{??
        ??????ib?:=?getInmemoryBlock()??
        ??????ibs?=?append(ibs,?ib)??
        ??????ris.ibs?=?ibs??
        ???}??
        ???//?取最后一個(gè)內(nèi)存塊
        ???ib?:=?ibs[len(ibs)-1]??
        ???for?_,?item?:=?range?items?{?
        ??????//?添加索引item到內(nèi)存塊?
        ??????if?!ib.Add(item)?{??//?超過了內(nèi)存塊大小
        ?????????//?重新獲取一個(gè)內(nèi)存塊,此時(shí)肯定為空
        ?????????ib?=?getInmemoryBlock()??
        ?????????//?重新添加
        ?????????if?!ib.Add(item)?{??
        ????????????putInmemoryBlock(ib)??
        ????????????err?=?fmt.Errorf("cannot?insert?an?item?%q?into?an?empty?inmemoryBlock;?it?looks?like?the?item?is?too?large??len(item)=%d",?item,?len(item))??
        ????????????break??
        ?????????}??
        ?????????ibs?=?append(ibs,?ib)??
        ?????????ris.ibs?=?ibs??
        ??????}??
        ???}??
        ???//?超過了每個(gè)分片的最大內(nèi)存塊的數(shù)量
        ???if?len(ibs)?>=?maxBlocksPerShard?{??
        ??????//?將內(nèi)存塊放到待刷新的內(nèi)存塊列表中去
        ??????blocksToFlush?=?append(blocksToFlush,?ibs...)??
        ??????//?釋放前面的內(nèi)存塊資源
        ??????for?i?:=?range?ibs?{??
        ?????????ibs[i]?=?nil??
        ??????}??
        ??????ris.ibs?=?ibs[:0]??
        ??????ris.lastFlushTime?=?fasttime.UnixTimestamp()??
        ???}??
        ???ris.mu.Unlock()??
        ???//?執(zhí)行merge合并操作
        ???tb.mergeRawItemsBlocks(blocksToFlush,?false)??
        ???return?err??
        }

        //?lib/mergeset/encoding.go
        //?內(nèi)存中的一個(gè)Block塊結(jié)構(gòu)
        type?inmemoryBlock?struct?{??
        ???commonPrefix?[]byte??
        ???data?????????[]byte??//?用來存儲(chǔ)數(shù)據(jù)
        ???items????????[]Item??//?用來存儲(chǔ)每個(gè)item數(shù)據(jù)的起始偏移量
        }

        //?Item?表示用于存儲(chǔ)在?mergeset?中的單個(gè)?item?數(shù)據(jù)
        type?Item?struct?{??
        ???//?數(shù)據(jù)的開始偏移量
        ???Start?uint32??
        ???//?數(shù)據(jù)的結(jié)束偏移量
        ???End?uint32??
        }

        //?maxInmemoryBlockSize?是?memoryblock.data?的最大值。
        //??
        //?它必須適合?CPU?緩存大小,即當(dāng)前?CPU?的緩存大小為64kb。
        const?maxInmemoryBlockSize?=?64?*?1024

        //?Add?將?x?添加到內(nèi)存卡?ib?的末尾
        //??
        //?如果由于塊大小限制,x?未添加到?ib,則返回?false
        func?(ib?*inmemoryBlock)?Add(x?[]byte)?bool?{??
        ???data?:=?ib.data??
        ???//?操過塊大小限制了
        ???if?len(x)+len(data)?>?maxInmemoryBlockSize?{??
        ??????return?false??
        ???}??
        ???if?cap(data)?==?0?{??
        ??????//?預(yù)分配?data?和?items?以減少內(nèi)存分配
        ??????data?=?make([]byte,?0,?maxInmemoryBlockSize)??
        ??????ib.items?=?make([]Item,?0,?512)??
        ???}??
        ???dataLen?:=?len(data)??
        ???data?=?append(data,?x...)??//?將?x?添加到?data
        ???ib.items?=?append(ib.items,?Item{??//?更新?items
        ??????Start:?uint32(dataLen),??
        ??????End:???uint32(len(data)),??
        ???})??
        ???ib.data?=?data??
        ???return?true??
        }

        rawItemsShard 表示保存索引數(shù)據(jù)的一個(gè)分片,里面其實(shí)就是一個(gè) inmemoryBlock 的內(nèi)存塊切片,每個(gè)分片最多有 512 個(gè)內(nèi)存塊,每個(gè)內(nèi)存塊占用 64KB 的容量,當(dāng)每個(gè)分片中的內(nèi)存塊數(shù)量超過最大數(shù)量(512)會(huì)去將內(nèi)存塊數(shù)據(jù)刷新為 Part

        如果分片中的內(nèi)存塊數(shù)量沒超過上限,則會(huì)通過一個(gè)任務(wù)去定時(shí)(1s)將 rawItem 數(shù)據(jù)刷新(轉(zhuǎn)換)為 Part,以便它們對(duì)搜索可見。

        //?lib/mergeset/table.go
        //?將最近的?rawItem?刷新(轉(zhuǎn)換)為?Part,以便它們對(duì)搜索可見。
        const?rawItemsFlushInterval?=?time.Second

        //?啟動(dòng)?rawItems?Flusher?任務(wù)
        func?(tb?*Table)?startRawItemsFlusher()?{??
        ???tb.rawItemsFlusherWG.Add(1)??
        ???go?func()?{??
        ??????tb.rawItemsFlusher()??
        ??????tb.rawItemsFlusherWG.Done()??
        ???}()??
        }??
        ??
        func?(tb?*Table)?rawItemsFlusher()?{??
        ???ticker?:=?time.NewTicker(rawItemsFlushInterval)??
        ???defer?ticker.Stop()??
        ???for?{??
        ??????select?{??
        ??????case?<-tb.stopCh:??
        ?????????return??
        ??????case?<-ticker.C:??
        ?????????tb.flushRawItems(false)??
        ??????}??
        ???}??
        }

        合并內(nèi)存數(shù)據(jù)

        將內(nèi)存塊數(shù)據(jù)轉(zhuǎn)換為 Part 都是通過 mergeRawItemsBlocks 函數(shù)去實(shí)現(xiàn)的。

        //?lib/mergeset/table.go

        //?一次合并的默認(rèn)?parts?數(shù)
        //??
        //?這個(gè)數(shù)字是根據(jù)經(jīng)驗(yàn)得出的,它提供了盡可能低的開銷
        //?有關(guān)詳細(xì)信息,請(qǐng)參閱?appendPartsToMerge?test
        const?defaultPartsToMerge?=?15

        //?merge?內(nèi)存塊數(shù)據(jù)
        func?(tb?*Table)?mergeRawItemsBlocks(ibs?[]*inmemoryBlock,?isFinal?bool)?{??
        ???if?len(ibs)?==?0?{??
        ??????return??
        ???}??
        ???tb.partMergersWG.Add(1)??
        ???defer?tb.partMergersWG.Done()??
        ??
        ???pws?:=?make([]*partWrapper,?0,?(len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)??
        ???var?pwsLock?sync.Mutex??
        ???var?wg?sync.WaitGroup??
        ???for?len(ibs)?>?0?{??
        ??????//?一次最大合并的內(nèi)存塊數(shù)量
        ??????n?:=?defaultPartsToMerge??
        ??????if?n?>?len(ibs)?{??
        ?????????n?=?len(ibs)??
        ??????}??
        ??????wg.Add(1)??
        ??????go?func(ibsPart?[]*inmemoryBlock)?{??
        ?????????defer?wg.Done()??
        ?????????//?merge?inmemoryBlock
        ?????????pw?:=?tb.mergeInmemoryBlocks(ibsPart)??
        ?????????if?pw?==?nil?{??
        ????????????return??
        ?????????}??
        ?????????pw.isInMerge?=?true??
        ?????????pwsLock.Lock()??
        ?????????pws?=?append(pws,?pw)??
        ?????????pwsLock.Unlock()??
        ??????}(ibs[:n])??
        ??????ibs?=?ibs[n:]??
        ???}??
        ???wg.Wait()??
        ???if?len(pws)?>?0?{??
        ??????if?err?:=?tb.mergeParts(pws,?nil,?true);?err?!=?nil?{??
        ?????????logger.Panicf("FATAL:?cannot?merge?raw?parts:?%s",?err)??
        ??????}??
        ??????if?tb.flushCallback?!=?nil?{??
        ?????????if?isFinal?{??
        ????????????tb.flushCallback()??
        ?????????}?else?{??
        ????????????atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall,?0,?1)??
        ?????????}??
        ??????}??
        ???}??
        ??
        ???for?{??
        ??????tb.partsLock.Lock()??
        ??????ok?:=?len(tb.parts)?<=?maxParts??
        ??????tb.partsLock.Unlock()??
        ??????if?ok?{??
        ?????????return??
        ??????}??
        ??
        ??????//?The?added?part?exceeds?maxParts?count.?Assist?with?merging?other?parts.??
        ??????//??????
        ??????//?Prioritize?assisted?merges?over?searches.??????
        ??????storagepacelimiter.Search.Inc()??
        ??????err?:=?tb.mergeExistingParts(false)??
        ??????storagepacelimiter.Search.Dec()??
        ??????if?err?==?nil?{??
        ?????????atomic.AddUint64(&tb.assistedMerges,?1)??
        ?????????continue??
        ??????}??
        ??????if?errors.Is(err,?errNothingToMerge)?||?errors.Is(err,?errForciblyStopped)?{??
        ?????????return??
        ??????}??
        ??????logger.Panicf("FATAL:?cannot?merge?small?parts:?%s",?err)??
        ???}??
        }

        mergeRawItemsBlocks 函數(shù)將指定的內(nèi)存塊進(jìn)行 merge 合并操作,一次合并最大的內(nèi)存塊數(shù)量為 15,然后在獨(dú)立的 goroutine 中去進(jìn)行合并操作,使用 mergeInmemoryBlocks 函數(shù)。

        //?lib/mergeset/table.go
        //?merge?InmemoryBlocks
        func?(tb?*Table)?mergeInmemoryBlocks(ibs?[]*inmemoryBlock)?*partWrapper?{??
        ???//?將?InmemoryBlock?列表轉(zhuǎn)換成?inmemoryPart?列表?
        ???//?inmemoryPart?表示內(nèi)存中的Part
        ???mps?:=?make([]*inmemoryPart,?0,?len(ibs))??
        ???for?_,?ib?:=?range?ibs?{??
        ??????if?len(ib.items)?==?0?{??
        ?????????continue??
        ??????}??
        ??????mp?:=?getInmemoryPart()??
        ??????mp.Init(ib)?//?將inmemoryBlock轉(zhuǎn)換為inmemoryPart
        ??????putInmemoryBlock(ib)??
        ??????mps?=?append(mps,?mp)??
        ???}??
        ???if?len(mps)?==?0?{??
        ??????return?nil??
        ???}??
        ???if?len(mps)?==?1?{??
        ??????//?沒有要合并的內(nèi)容。只需返回單個(gè)?inmemory?part。
        ??????mp?:=?mps[0]??
        ??????p?:=?mp.NewPart()??
        ??????return?&partWrapper{??
        ?????????p:????????p,??
        ?????????mp:???????mp,??
        ?????????refCount:?1,??
        ??????}??
        ???}??
        ???defer?func()?{??
        ??????for?_,?mp?:=?range?mps?{??
        ?????????putInmemoryPart(mp)??
        ??????}??
        ???}()??
        ??
        ???atomic.AddUint64(&tb.mergesCount,?1)??
        ???atomic.AddUint64(&tb.activeMerges,?1)??
        ???defer?atomic.AddUint64(&tb.activeMerges,?^uint64(0))??
        ??
        ???//?為每個(gè)?`inmemoryPart`?構(gòu)造?`blockStreamReader`,?用于迭代讀取?items
        ???bsrs?:=?make([]*blockStreamReader,?0,?len(mps))??
        ???for?_,?mp?:=?range?mps?{??
        ??????bsr?:=?getBlockStreamReader()??
        ??????bsr.InitFromInmemoryPart(mp)??
        ??????bsrs?=?append(bsrs,?bsr)??
        ???}??
        ??
        ???//?準(zhǔn)備一個(gè)?blockStreamWriter?用于合并寫入的?part
        ???bsw?:=?getBlockStreamWriter()??
        ???//?不要通過?getInmemoryPart()?獲取?mpDst,因?yàn)榕c池中的其他條目相比,它的大小可能太大。?
        ???//?這可能會(huì)導(dǎo)致內(nèi)存使用量增加,因?yàn)榇嬖诖罅康乃槠?
        ???//?創(chuàng)建一個(gè)新的?inmemoryPart,接收合并的數(shù)據(jù)
        ???mpDst?:=?&inmemoryPart{}??
        ???bsw.InitFromInmemoryPart(mpDst)??
        ??
        ???//?開始?merge?數(shù)據(jù)
        ???//?該?merge?不應(yīng)該被?stopCh?中斷,因?yàn)樗赡苁?stopCh?關(guān)閉后的最終結(jié)果
        ???err?:=?mergeBlockStreams(&mpDst.ph,?bsw,?bsrs,?tb.prepareBlock,?nil,?&tb.itemsMerged)??
        ???if?err?!=?nil?{??
        ??????logger.Panicf("FATAL:?cannot?merge?inmemoryBlocks:?%s",?err)??
        ???}??
        ???putBlockStreamWriter(bsw)??
        ???for?_,?bsr?:=?range?bsrs?{??
        ??????putBlockStreamReader(bsr)??
        ???}??
        ??
        ???p?:=?mpDst.NewPart()??
        ???return?&partWrapper{??
        ??????p:????????p,??
        ??????mp:???????mpDst,??
        ??????refCount:?1,??
        ???}??
        }

        上面的函數(shù)會(huì)將指定的內(nèi)存塊轉(zhuǎn)換成 partWrapper,該結(jié)構(gòu)就是一個(gè)包含 partinmemoryPart 的包裝器。

        //?lib/mergeset/table.go
        type?partWrapper?struct?{??
        ???p?*part??
        ??
        ???mp?*inmemoryPart??
        ??
        ???refCount?uint64??
        ??
        ???isInMerge?bool??
        }

        part 的結(jié)構(gòu)如下所示:

        //?lib/mergeset/part.go
        type?part?struct?{??
        ???ph?partHeader??
        ??
        ???path?string??
        ??
        ???size?uint64??
        ??
        ???mrs?[]metaindexRow??
        ??
        ???indexFile?fs.MustReadAtCloser??
        ???itemsFile?fs.MustReadAtCloser??
        ???lensFile??fs.MustReadAtCloser??
        }

        一個(gè) part 就是 Table 下面的一個(gè)數(shù)據(jù)目錄。

        part 中包含一個(gè) partHeader,該屬性中包含當(dāng)前 part 的一些 Meta 信息,一共有多少個(gè) items、有多少 blocks、第一個(gè)和最后一個(gè) item,對(duì)應(yīng)著 part 目錄下面的 metadata.json 文件。

        //?lib/mergeset/part_header.go
        type?partHeader?struct?{??
        ???//?part?包含的?items?數(shù)
        ???itemsCount?uint64??
        ??
        ???//?part?包含的?blocks?數(shù)
        ???blocksCount?uint64??
        ??
        ???//?part?中的第一個(gè)?item
        ???firstItem?[]byte??
        ??
        ???//?part?中的最后一個(gè)?item
        ???lastItem?[]byte??
        }

        part 中另外的屬性 path 表示當(dāng)前 part 的路徑,size 表示大小,另外三個(gè)屬性 indexFile、itemsFilelensFile 對(duì)應(yīng)中 part 目錄下面的三個(gè)文件:index.bin、items.binlens.bin。此外 part 結(jié)構(gòu)中還有最后一個(gè) mrs 屬性,是一個(gè) []metaindexRow。

        //?lib/mergeset/metaindex_row.go

        //?metaindexRow?描述了一個(gè)?blockHeaders?即索引塊。?
        type?metaindexRow?struct?{??
        ???//?第一個(gè)?block?中的第一個(gè)?item?元素
        ???//?它用于快速查找所需的索引塊
        ???firstItem?[]byte??
        ??
        ???//?塊包含的?blockHeaders?的數(shù)量
        ???blockHeadersCount?uint32??
        ??
        ???//?索引文件中塊的偏移量
        ???indexBlockOffset?uint64??
        ??
        ???//?索引文件中塊的大小
        ???indexBlockSize?uint32??
        }

        除了 part 之外還有一個(gè)內(nèi)存中的 inmemoryPart 結(jié)構(gòu),其基本結(jié)構(gòu)和 part 類似,不同的是幾個(gè)相關(guān)的屬性不是文件對(duì)象,而是 ByteBuffer,因?yàn)槭莾?nèi)存中的結(jié)構(gòu)。

        //?lib/mergeset/inmemory_part.go
        //?在內(nèi)存中的?Part?結(jié)構(gòu)
        type?inmemoryPart?struct?{??
        ???//?partHeader?記錄?itemsCount,?blocksCount,?firstItem,?lastItem?信息,?最后會(huì)序列化到?metadata.json
        ???ph?partHeader??
        ???//?當(dāng)前?block?的?header?信息,有?commonPrefix,?firstItem,?marshalType,?itemsCount,?itemsBlockOffset,?lenBlockOffset,?itemsBlockSize,?lenBlockSize
        ???bh?blockHeader??
        ???//?當(dāng)前?block?的?metaindex?信息,存儲(chǔ)了當(dāng)前?blockHeader?的?firstItem,?blockHeaderCount,?indexBlockOffset,?indexBlockSize
        ???mr?metaindexRow??
        ???
        ???//?用于序列化后寫入內(nèi)存/磁盤文件使用
        ???metaindexData?bytesutil.ByteBuffer??//?->?metaindex.bin
        ???indexData?????bytesutil.ByteBuffer??//?->?index.bin
        ???itemsData?????bytesutil.ByteBuffer??//?->?items.bin
        ???lensData??????bytesutil.ByteBuffer??//?->?lens.bin
        }

        其他幾個(gè)屬性上面介紹過,blockHeader 結(jié)構(gòu)如下所示,用于記錄 block 頭信息:

        //?lib/mergeset/block_header.go
        type?blockHeader?struct?{??
        ???//?塊中所有?items?的公用前綴??
        ???commonPrefix?[]byte??
        ??
        ???//?第一個(gè)?item
        ???firstItem?[]byte??
        ??
        ???//?用于塊壓縮的?Marshal?類型
        ???marshalType?marshalType??
        ??
        ???//?塊中的?items?數(shù),不包括第一個(gè)?item
        ???itemsCount?uint32??
        ??
        ???//?items?block?的偏移量
        ???itemsBlockOffset?uint64??
        ??
        ???//?lens?block?的偏移量
        ???lensBlockOffset?uint64??
        ??
        ???//?items?block?的大小
        ???itemsBlockSize?uint32??
        ??
        ???//?lens?block?的大小
        ???lensBlockSize?uint32??
        }

        整個(gè) part 的結(jié)構(gòu)看上去確實(shí)比較復(fù)雜,為什么需要設(shè)計(jì)這些屬性?核心肯定就是為了快速索引,我們先往下分析,待會(huì)再回過頭來看。

        inmemoryPartpart 讀入內(nèi)存中的結(jié)構(gòu), 在 inmemoryBlock merge 之前,每個(gè) inmemoryBlock 都會(huì)先通過 mp.Init 轉(zhuǎn)換成一個(gè) inmemoryPart 的結(jié)構(gòu),inmemoryPartmetaindexData、indexDataitemsDatalensData 數(shù)據(jù)結(jié)構(gòu)與磁盤對(duì)應(yīng)的文件內(nèi)容一致。

        序列化數(shù)據(jù)

        現(xiàn)在我們?cè)倩氐缴厦娴?mergeInmemoryBlocks 函數(shù),流程如下所示:

        • 1.將所有的 inmemoryBlock 轉(zhuǎn)換為 inmemoryPart 結(jié)構(gòu)
        • 2.為每個(gè) inmemoryPart 構(gòu)造 blockStreamReader,用于迭代讀取 items
        • 3.創(chuàng)建一個(gè)新的 inmemoryPart,并構(gòu)造一個(gè) blockSteamWriter 用于合并寫入的數(shù)據(jù)
        • 4.然后調(diào)用 mergeBlockStreams 函數(shù)執(zhí)行真正的 merge 操作

        首先通過 Init 函數(shù)將 inmemoryBlock 轉(zhuǎn)換為 inmemoryPart 結(jié)構(gòu)。

        //?lib/mergeset/inmemory_part.go
        //?Init?初始化?mp?從?ib.?
        func?(mp?*inmemoryPart)?Init(ib?*inmemoryBlock)?{??
        ???mp.Reset()??
        ???
        ???sb?:=?&storageBlock{}??
        ???sb.itemsData?=?mp.itemsData.B[:0]??
        ???sb.lensData?=?mp.lensData.B[:0]??
        ??
        ???//?使用盡可能小的壓縮等級(jí)來壓縮?inmemoryPart,因?yàn)樗芸炀蜁?huì)被合并到文件?part?去。
        ???compressLevel?:=?-5??
        ???//?序列化亂序的數(shù)據(jù)
        ???mp.bh.firstItem,?mp.bh.commonPrefix,?mp.bh.itemsCount,?mp.bh.marshalType?=?ib.MarshalUnsortedData(sb,?mp.bh.firstItem[:0],?mp.bh.commonPrefix[:0],?compressLevel)??

        ???//?獲取?partHeader?值
        ???mp.ph.itemsCount?=?uint64(len(ib.items))??
        ???mp.ph.blocksCount?=?1??
        ???mp.ph.firstItem?=?append(mp.ph.firstItem[:0],?ib.items[0].String(ib.data)...)??
        ???mp.ph.lastItem?=?append(mp.ph.lastItem[:0],?ib.items[len(ib.items)-1].String(ib.data)...)??

        ???//?獲取itemsData,更新blockHeader的items偏移和數(shù)量
        ???mp.itemsData.B?=?sb.itemsData??
        ???mp.bh.itemsBlockOffset?=?0??
        ???mp.bh.itemsBlockSize?=?uint32(len(mp.itemsData.B))??

        ???//?獲取lensData,更新blockHeader的lens偏移和數(shù)量
        ???mp.lensData.B?=?sb.lensData??
        ???mp.bh.lensBlockOffset?=?0??
        ???mp.bh.lensBlockSize?=?uint32(len(mp.lensData.B))??

        ???//?獲取?indexData,blockHeader序列化的值
        ???bb?:=?inmemoryPartBytePool.Get()??
        ???bb.B?=?mp.bh.Marshal(bb.B[:0])??
        ???mp.indexData.B?=?encoding.CompressZSTDLevel(mp.indexData.B[:0],?bb.B,?0)??

        ???//?獲取?metaindexData,metaindexRow序列化的值
        ???mp.mr.firstItem?=?append(mp.mr.firstItem[:0],?mp.bh.firstItem...)??
        ???mp.mr.blockHeadersCount?=?1??
        ???mp.mr.indexBlockOffset?=?0??
        ???mp.mr.indexBlockSize?=?uint32(len(mp.indexData.B))??
        ???bb.B?=?mp.mr.Marshal(bb.B[:0])??
        ???mp.metaindexData.B?=?encoding.CompressZSTDLevel(mp.metaindexData.B[:0],?bb.B,?0)??
        ???inmemoryPartBytePool.Put(bb)??
        }

        上面的函數(shù)將 inmemoryBlock 轉(zhuǎn)換成 inmemoryPart,首先會(huì)通過一個(gè) MarshalUnsortedData 函數(shù)來序列化未排序的數(shù)據(jù)。

        //?MarshalUnsortedData?序列化未排序的?items?從?ib?到?sb.
        //??
        //?It?also:??
        //?-?將第一個(gè)?item?追加到?firstItemDst?并返回結(jié)果??
        //?-?將所有?item?的公共前綴附加到?commonPrefixDst?并返回結(jié)果??
        //?-?返回包含第一個(gè)?item?的編碼項(xiàng)的數(shù)量??
        //?-?返回用于編碼的?marshal?類型??
        func?(ib?*inmemoryBlock)?MarshalUnsortedData(sb?*storageBlock,?firstItemDst,?commonPrefixDst?[]byte,?compressLevel?int)?([]byte,?[]byte,?uint32,?marshalType)?{??
        ???if?!ib.isSorted()?{??
        ??????sort.Sort(ib)?//?排序??
        ???}??
        ???//?更新內(nèi)存塊的公共前綴??
        ???ib.updateCommonPrefix()??
        ???//?序列化數(shù)據(jù)??
        ???return?ib.marshalData(sb,?firstItemDst,?commonPrefixDst,?compressLevel)??
        }

        上面的序列化函數(shù)中首先會(huì)對(duì)未排序的數(shù)據(jù)進(jìn)行排序,然后更新內(nèi)存塊的公共前綴:

        //?lib/mergeset/encoding.go
        //?更新公共前綴??
        func?(ib?*inmemoryBlock)?updateCommonPrefix()?{??
        ???ib.commonPrefix?=?ib.commonPrefix[:0]??//?公共前綴
        ???if?len(ib.items)?==?0?{??
        ??????return??
        ???}??
        ???items?:=?ib.items??????????//?數(shù)據(jù)前后位置??
        ???data?:=?ib.data????????????//?數(shù)據(jù)??
        ???cp?:=?items[0].Bytes(data)?//?第一段數(shù)據(jù)??
        ???if?len(cp)?==?0?{??
        ??????return??
        ???}??
        ???for?_,?it?:=?range?items[1:]?{?//?后面的數(shù)據(jù)??
        ??????//?計(jì)算公共前綴的長(zhǎng)度??
        ??????cpLen?:=?commonPrefixLen(cp,?it.Bytes(data))??
        ??????if?cpLen?==?0?{??
        ?????????return??
        ??????}??
        ??????//?截取公共前綴數(shù)據(jù)??
        ??????cp?=?cp[:cpLen]??
        ???}??
        ???//?設(shè)置內(nèi)存塊的公共前綴??
        ???ib.commonPrefix?=?append(ib.commonPrefix[:0],?cp...)??
        }

        公共前綴就是把每段數(shù)據(jù)包含的共同前綴提取出來,這樣存儲(chǔ)的時(shí)候后面就可以不需要存儲(chǔ)共同的部分了,減少存儲(chǔ)空間。

        公共前綴提取出來后,接下來調(diào)用 marshalData 函數(shù)去序列化數(shù)據(jù)。

        //?lib/mergeset/encoding.go
        //?前提條件:??
        //?-?ib.items?必須排序??
        //?-?updateCommonPrefix?必須被調(diào)用??
        //?序列化數(shù)據(jù)??
        func?(ib?*inmemoryBlock)?marshalData(sb?*storageBlock,?firstItemDst,?commonPrefixDst?[]byte,?compressLevel?int)?([]byte,?[]byte,?uint32,?marshalType)?{??
        ???......??
        ???//?拷貝?inmemoryBlock?數(shù)據(jù)塊的?firstItem(排序后的第一條數(shù)據(jù))??
        ???data?:=?ib.data??????????????????????//?內(nèi)存塊數(shù)據(jù)??
        ???firstItem?:=?ib.items[0].Bytes(data)?//?第一條數(shù)據(jù)??
        ???firstItemDst?=?append(firstItemDst,?firstItem...)??
        ???//?最大公共前綴??
        ???commonPrefixDst?=?append(commonPrefixDst,?ib.commonPrefix...)?
        ???//?內(nèi)存塊數(shù)據(jù)小于2段或(數(shù)據(jù)大小-公共前綴長(zhǎng)度*數(shù)據(jù)段大小?
        ???if?len(data)-len(ib.commonPrefix)*len(ib.items)?64
        ?||?len(ib.items)?2?{??
        ??????//?對(duì)small?block使用普通序列化,因?yàn)樗阋??
        ??????ib.marshalDataPlain(sb)??
        ??????return?firstItemDst,?commonPrefixDst,?uint32(len(ib.items)),?marshalTypePlain??
        ???}??
        ??
        ???bbItems?:=?bbPool.Get()??
        ???bItems?:=?bbItems.B[:0]?//?保存目的?items?數(shù)據(jù)的內(nèi)存?buffer??
        ??
        ???bbLens?:=?bbPool.Get()??
        ???bLens?:=?bbLens.B[:0]?//?保存目的?lens?數(shù)據(jù)的內(nèi)存buffer??
        ??
        ???//?序列化?items?數(shù)據(jù)??
        ???//?第一項(xiàng)數(shù)據(jù)不需要存儲(chǔ),所以獲取的?Uint64s?大小要減1??
        ???xs?:=?encoding.GetUint64s(len(ib.items)?-?1)?
        ???defer?encoding.PutUint64s(xs)??
        ??
        ???cpLen?:=?len(ib.commonPrefix)?//?公共前綴的長(zhǎng)度??
        ???prevItem?:=?firstItem[cpLen:]?//?第一項(xiàng)數(shù)據(jù)(排除公共前綴)??
        ???prevPrefixLen?:=?uint64(0)??
        ???//?從第二個(gè)元素開始遍歷(第一個(gè)?firstItem?單獨(dú)存儲(chǔ))??
        ???for?i,?it?:=?range?ib.items[1:]?{?
        ??????//?偏移到公共前綴之后的位置
        ??????it.Start?+=?uint32(cpLen)???
        ??????//?Bytes(data)?得到的數(shù)據(jù)不包含公共前綴的部分???????????????????????????
        ??????item?:=?it.Bytes(data)??
        ??????//?計(jì)算第?N?項(xiàng)和?N-1?項(xiàng)的公共前綴長(zhǎng)度???????????????????????????????
        ??????prefixLen?:=?uint64(commonPrefixLen(prevItem,?item))???
        ??????//?僅僅只把差異的部分拷貝到目的buffer?
        ??????bItems?=?append(bItems,?item[prefixLen:]...)?????
        ??????//?第一次,與0異或,還是等于原值。異或后,兩個(gè)整數(shù)值前面相同的部分都為0了,數(shù)值變得更短,能夠便于壓縮。?????
        ??????xLen?:=?prefixLen?^?prevPrefixLen???????
        ??????//?上次的除去公共前綴的item?????????????????
        ??????prevItem?=?item??????????????????
        ??????//?上次計(jì)算得到的公共前綴長(zhǎng)度??????????????????????
        ??????prevPrefixLen?=?prefixLen????????????????????????????
        ??
        ??????xs.A[i]?=?xLen?//?異或后的公共前綴值??
        ???}

        ???//?對(duì)N-1個(gè)長(zhǎng)度進(jìn)行序列化(將uint64數(shù)組序列化成byte數(shù)組)??
        ???bLens?=?encoding.MarshalVarUint64s(bLens,?xs.A)????????????????????????????????????
        ???//?將items數(shù)據(jù)(只有差異的部分)ZSTD壓縮后,寫入storageBlock?
        ???sb.itemsData?=?encoding.CompressZSTDLevel(sb.itemsData[:0],?bItems,?compressLevel)??
        ??
        ???bbItems.B?=?bItems??
        ???bbPool.Put(bbItems)??
        ??
        ???//?序列化?lens?數(shù)據(jù)??
        ???//?第一項(xiàng)數(shù)據(jù)大?。ㄅ懦睬熬Y)
        ???prevItemLen?:=?uint64(len(firstItem)?-?cpLen)???
        ???for?i,?it?:=?range?ib.items[1:]?{?????????????//?從第二個(gè)元素開始遍歷?
        ??????//?item長(zhǎng)度?=?End-Start-公共前綴大小???
        ??????itemLen?:=?uint64(int(it.End-it.Start)?-?cpLen)?
        ??????//?與前面一個(gè)元素長(zhǎng)度異或?
        ??????xLen?:=?itemLen?^?prevItemLen????
        ??????//?上次去除公共前綴的長(zhǎng)度??????????????????
        ??????prevItemLen?=?itemLen???????????????????????????
        ??
        ??????xs.A[i]?=?xLen?//?異或后的元素長(zhǎng)度??
        ???}??
        ???//?前面記錄的是兩兩相對(duì)的長(zhǎng)度,這里記錄的是數(shù)據(jù)的真實(shí)長(zhǎng)度??
        ???//?長(zhǎng)度信息包含兩種,相對(duì)長(zhǎng)度和總長(zhǎng)度??
        ???bLens?=?encoding.MarshalVarUint64s(bLens,?xs.A)???
        ???//?將lens數(shù)據(jù)進(jìn)行ZSTD壓縮后,寫入storageBlock????????????????????????????????
        ???sb.lensData?=?encoding.CompressZSTDLevel(sb.lensData[:0],?bLens,?compressLevel)?
        ??
        ???bbLens.B?=?bLens??
        ???bbPool.Put(bbLens)??
        ??
        ???//?如果壓縮不到90%則選擇不壓縮??
        ???if?float64(len(sb.itemsData))?>?0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items))?{??
        ??????//?壓縮率不高的時(shí)候,選擇不壓縮??
        ??????ib.marshalDataPlain(sb)??
        ??????return?firstItemDst,?commonPrefixDst,?uint32(len(ib.items)),?marshalTypePlain??
        ???}??
        ??
        ???//?很好的壓縮率??
        ???return?firstItemDst,?commonPrefixDst,?uint32(len(ib.items)),?marshalTypeZSTD??
        }

        上面的序列化函數(shù)看上去比較復(fù)雜,實(shí)際上核心的一點(diǎn)就是想辦法盡可能減少存儲(chǔ)空間。首先將數(shù)據(jù)塊的第一個(gè)數(shù)據(jù)拷貝出來放入 firstItemDst,然后后面就從第二個(gè)元素開始去循環(huán)處理,首先計(jì)算第 N 項(xiàng)和 N-1 項(xiàng)的公共前綴長(zhǎng)度,然后將差異的數(shù)據(jù)部分保存起來,為了能夠反序列化回?cái)?shù)據(jù),還需要將兩兩之間公共前綴的長(zhǎng)度保存下來,為了能夠便于壓縮,使用異或的方式來計(jì)算兩兩之間的公共前綴長(zhǎng)度值。

        循環(huán)計(jì)算后,將保存的兩兩之間的公共前綴長(zhǎng)度進(jìn)行序列化,下面的函數(shù)將一個(gè) uint64 類型的切片轉(zhuǎn)換成字節(jié)切片,如果數(shù)據(jù)小于 128 直接轉(zhuǎn)換即可,如果大于 127 則用一個(gè) 7bit 來表示數(shù)值的內(nèi)容,最高位后面的一個(gè)字節(jié)用來表示長(zhǎng)度,這樣就可以用變長(zhǎng)長(zhǎng)度來序列化數(shù)值,而不是每個(gè)數(shù)值都占用固定的長(zhǎng)度。

        //?lib/encoding/int.go
        //?將uint64切片轉(zhuǎn)成字節(jié)切片
        func?MarshalVarUint64s(dst?[]byte,?us?[]uint64)?[]byte?{??
        ???for?_,?u?:=?range?us?{??
        ??????if?u?0x80?{?//?小于128,直接加入到?dst,能直接存到?byte?中去??
        ?????????//?Fast?path??
        ?????????dst?=?append(dst,?byte(u))??
        ?????????continue??
        ??????}??
        ??????for?u?>?0x7f?{?//?大于127,則超過的部分保留為?0x80,低位右移7位繼續(xù)計(jì)算??
        ?????????dst?=?append(dst,?0x80|byte(u))??
        ?????????u?>>=?7??
        ??????}??
        ??????dst?=?append(dst,?byte(u))??
        ???}??
        ???return?dst??
        }

        長(zhǎng)度數(shù)據(jù)序列化后,將 items 數(shù)據(jù)(只有差異的部分)進(jìn)行 ZSTD 壓縮后,寫入 storageBlock。

        只記錄兩兩之間的公共前綴長(zhǎng)度還不夠,還需要記錄數(shù)據(jù)的真實(shí)長(zhǎng)度,最后同樣再將 lens 數(shù)據(jù)進(jìn)行 ZSTD 壓縮后,寫入 storageBlock。

        如果最后的結(jié)果壓縮不到 90% 則選擇不壓縮,不壓縮則使用 marshalDataPlain 函數(shù)進(jìn)行序列化:

        //?lib/mergeset/encoding.go
        //?普通序列化數(shù)據(jù)??
        func?(ib?*inmemoryBlock)?marshalDataPlain(sb?*storageBlock)?{??
        ???data?:=?ib.data??
        ??
        ???//?序列化?items?數(shù)據(jù)??
        ???//?不需要序列化第一項(xiàng)數(shù)據(jù),因?yàn)樗鼤?huì)在?marshalData?中返回給調(diào)用者。??
        ???cpLen?:=?len(ib.commonPrefix)?//?公共前綴長(zhǎng)度??
        ???b?:=?sb.itemsData[:0]??
        ???for?_,?it?:=?range?ib.items[1:]?{?//?第一項(xiàng)之后的數(shù)據(jù)??
        ??????it.Start?+=?uint32(cpLen)?????????//?跳過公共前綴??
        ??????b?=?append(b,?it.String(data)...)?//?添加移出公共前綴的數(shù)據(jù)??
        ???}??
        ???sb.itemsData?=?b?//?itemsData數(shù)據(jù)??
        ??
        ???//?序列化?lens?數(shù)據(jù)??
        ???b?=?sb.lensData[:0]??
        ???for?_,?it?:=?range?ib.items[1:]?{?//?第一項(xiàng)之后的數(shù)據(jù)??
        ??????//?原始的End-Start-公共前綴長(zhǎng)度??
        ??????b?=?encoding.MarshalUint64(b,?uint64(int(it.End-it.Start)-cpLen))?
        ???}??
        ???sb.lensData?=?b??
        }

        經(jīng)過上面的序列化過后就可以得到第一個(gè)數(shù)據(jù)、公共前綴、items 個(gè)數(shù)以及序列化類型,然后將這些數(shù)據(jù)存入 blockHeader 中去,后面就是一些比較簡(jiǎn)單的常規(guī)操作。

        轉(zhuǎn)換成 inmemoryPart 后,再包裝成 blockStreamReader,創(chuàng)建一個(gè)新的 inmemoryPart,并構(gòu)造一個(gè) blockSteamWriter 用于合并寫入的數(shù)據(jù),然后調(diào)用 mergeBlockStreams 函數(shù)執(zhí)行真正的 merge 操作。

        //?lib/mergeset/merge.go
        //?mergeBlockStreams?合并?bsrs?并將結(jié)果寫入?bsw
        //??
        //?也填充了?ph??
        //??
        //?prepareBlock?是可選的?
        //??
        //?當(dāng)?stopCh?關(guān)閉時(shí),該函數(shù)立即返回
        //??
        //?它還以原子方式將合并的?items?添加到?itemsMerged
        func?mergeBlockStreams(ph?*partHeader,?bsw?*blockStreamWriter,?bsrs?[]*blockStreamReader,?prepareBlock?PrepareBlockCallback,?stopCh?<-chan?struct{},??
        ???itemsMerged?*uint64)
        ?error
        ?{??
        ???//?將多個(gè)?blockStreamReader?構(gòu)造成一個(gè)?blockStreamMerger?結(jié)構(gòu)
        ???bsm?:=?bsmPool.Get().(*blockStreamMerger)??
        ???if?err?:=?bsm.Init(bsrs,?prepareBlock);?err?!=?nil?{??
        ??????return?fmt.Errorf("cannot?initialize?blockStreamMerger:?%w",?err)??
        ???}??
        ???err?:=?bsm.Merge(bsw,?ph,?stopCh,?itemsMerged)??
        ???bsm.reset()??
        ???bsmPool.Put(bsm)??
        ???bsw.MustClose()??
        ???if?err?==?nil?{??
        ??????return?nil??
        ???}??
        ???return?fmt.Errorf("cannot?merge?%d?block?streams:?%s:?%w",?len(bsrs),?bsrs,?err)??
        }

        首先把多個(gè) blockStreamReader 構(gòu)造成一個(gè) blockStreamMerger 結(jié)構(gòu), merger 里面主要是一個(gè) bsrHeap 堆用于維護(hù) bsrs,用于 merge 數(shù)據(jù)時(shí)的排序。首先通過 merger 的 Init 函數(shù)構(gòu)造堆排序的結(jié)構(gòu),然后核心是調(diào)用 merger 的 Merge 函數(shù)進(jìn)行處理。

        //?lib/mergeset/merge.go
        func?(bsm?*blockStreamMerger)?Merge(bsw?*blockStreamWriter,?ph?*partHeader,?stopCh?<-chan?struct{},?itemsMerged?*uint64)?error?{??
        again:??
        ???if?len(bsm.bsrHeap)?==?0?{??
        ??????//?將最后的?inmemoryBlock(可能不完整)寫入?bsw
        ??????bsm.flushIB(bsw,?ph,?itemsMerged)??
        ??????return?nil??
        ???}??
        ??
        ???select?{??
        ???case?<-stopCh:??
        ??????return?errForciblyStopped??
        ???default:??
        ???}??
        ???//?取出?blockStreamReader
        ???bsr?:=?heap.Pop(&bsm.bsrHeap).(*blockStreamReader)??
        ??
        ???var?nextItem?[]byte??//?下一個(gè)?blockStreamReader
        ???hasNextItem?:=?false??
        ???if?len(bsm.bsrHeap)?>?0?{??
        ??????nextItem?=?bsm.bsrHeap[0].bh.firstItem??
        ??????hasNextItem?=?true??
        ???}??
        ???items?:=?bsr.Block.items??
        ???data?:=?bsr.Block.data??
        ???//?循環(huán)所有的?items
        ???for?bsr.blockItemIdx?len(bsr.Block.items)?{??
        ??????item?:=?items[bsr.blockItemIdx].Bytes(data)??
        ??????if?hasNextItem?&&?string(item)?>?string(nextItem)?{??
        ?????????break??
        ??????}??
        ??????//?添加元素
        ??????if?!bsm.ib.Add(item)?{??
        ?????????//?bsm.ib?已滿,將其刷新到?bsw?并繼續(xù)
        ?????????bsm.flushIB(bsw,?ph,?itemsMerged)??
        ?????????continue??
        ??????}??
        ??????bsr.blockItemIdx++??
        ???}??
        ???if?bsr.blockItemIdx?==?len(bsr.Block.items)?{??
        ??????//?bsr.Block?已完全讀取,處理下一個(gè)?block
        ??????if?bsr.Next()?{??
        ?????????heap.Push(&bsm.bsrHeap,?bsr)??
        ?????????goto?again??
        ??????}??
        ??????if?err?:=?bsr.Error();?err?!=?nil?{??
        ?????????return?fmt.Errorf("cannot?read?storageBlock:?%w",?err)??
        ??????}??
        ??????goto?again??
        ???}??

        ???//?bsr.Block?中的下一個(gè)?item?超過了?nextItem
        ???//?調(diào)整?bsr.bh.firstItem?并將?bsr?返回到堆
        ???bsr.bh.firstItem?=?append(bsr.bh.firstItem[:0],?bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)??
        ???heap.Push(&bsm.bsrHeap,?bsr)??
        ???goto?again??
        }

        這里主要解決的問題是多個(gè)有序的字節(jié)數(shù)組(inmemoryPart),按照字節(jié)序排序,合成一個(gè) inmemoryPart 的過程,在 merge 的過程中,每 64KB 會(huì)單獨(dú)創(chuàng)建一個(gè) blockHeader,用于快速索引該 block 里面的 Items。

        持久化數(shù)據(jù)

        最后重復(fù)上面的過程,將 n 個(gè) inmemoryBlock 合并成 (n-1)/defaultPartsToMerge+1 個(gè) inmemoryPart,最后再調(diào)用 mergeParts 函數(shù)完成索引持久化操作,持久化后生成的索引 part,主要包含 metaindex.bin、index.bin、lens.bin、items.binmetadata.json 等 5 個(gè)文件。

        這幾個(gè)文件的關(guān)系如下圖所示, metaindex.bin 文件通過 metaindexRow 索引 index.bin 文件,index.bin 文件通過 indexBlock 中的 blockHeader 同時(shí)索引 items.bin 文件和 items.bin 文件。

        metaindex.bin:文件包含一系列的 metaindexRow 數(shù)據(jù),每個(gè) metaindexRow 中包含第一條數(shù)據(jù) firstItem、索引塊包含的塊頭部數(shù) blockHeadersCount、索引塊偏移 indexBlockOffset 以及索引塊大小 indexBlockSize。

        • metaindexRow 在文件中按照 firstItem 的大小的字典序排序存儲(chǔ),以支持二分查找
        • metaindex.bin 文件使用 ZSTD 進(jìn)行壓縮
        • metaindex.bin 文件中的內(nèi)容在 part 打開時(shí),會(huì)全部讀出加載至內(nèi)存中,以加速查詢過濾
        • metaindexRow 包含的 firstItem 為其索引的 indexBlock 中所有 blockHeader 中字典序最小的 firstItem
        • 查找時(shí)根據(jù) firstItem 進(jìn)行二分檢索

        index.bin:文件中包含一系列的 indexBlock, 每個(gè) indexBlock 又包含一系列 blockHeader,每個(gè) blockHeader 包含 item 的公共前綴 commonPrefix、第一項(xiàng)數(shù)據(jù) firstItemitemsData 的序列化類型 marshalType、itemsData 包含的 item 數(shù)、item 塊的偏移 itemsBlockOffset 等內(nèi)容,就是前面使用將 inmemoryBlock 轉(zhuǎn)換為 inmemoryPart 結(jié)構(gòu)的 Init 函數(shù)得到的。

        • 每個(gè) indexBlock 使用 ZSTD 壓縮算法進(jìn)行壓縮
        • indexBlock 中查找時(shí),根據(jù) firstItem 進(jìn)行二分檢索 blockHeader

        items.bin 文件中,包含一系列的 itemsData, 每個(gè) itemsData 又包含一系列的 Item。

        • itemsData 會(huì)視情況而定來是否使用 ZTSD 壓縮,當(dāng) item 個(gè)數(shù)小于 2 時(shí),或者 itemsData 的長(zhǎng)度小于 64 字節(jié)時(shí),不壓縮;當(dāng) itemsData 使用 ZSTD 壓縮后的壓縮率大于90%的時(shí)候也不壓縮
        • 每個(gè) item 在存儲(chǔ)時(shí),去掉了 blockHeader 中的公共前綴 commonPrefix 以提高壓縮率

        lens.bin 文件中,包含一系列的 lensData, 每個(gè) lensData 又包含一系列 8 字節(jié)的長(zhǎng)度 len, 長(zhǎng)度 len 標(biāo)識(shí) items.bin 文件中對(duì)應(yīng) item 的長(zhǎng)度。在讀取或者需要解析 itemsData 中的 item 時(shí),先要讀取對(duì)應(yīng)的 lensData 中對(duì)應(yīng)的長(zhǎng)度 len。 當(dāng) itemsData 進(jìn)行壓縮時(shí),lensData 會(huì)先使用異或算法進(jìn)行壓縮,然后再使用 ZSTD 算法進(jìn)一步壓縮。

        到這里我們就了解了索引數(shù)據(jù)是實(shí)現(xiàn)和存儲(chǔ)原理了,那么真正的指標(biāo)數(shù)據(jù)又是如何去存儲(chǔ)的呢?未完待續(xù)......

        瀏覽 63
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            我被继夫添我阳道舒服电影 | 久久性爱一区 | 日本草逼视频 | 免费观看成人a毛片A片涩涩屋 | 久久无码一区二区三区 | 91黄色精品 | 国产男女无遮挡猛进猛出图集 | 日韩激情视频 | 久久免费视频观看 | 三上悠亚作品在线看 |