1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        從零實現(xiàn)一個時序數(shù)據(jù)庫

        共 42479字,需瀏覽 85分鐘

         ·

        2021-08-06 07:10

        時序數(shù)據(jù)庫(TSDB: Time Series Database)大多數(shù)時候都是為了滿足監(jiān)控場景的需求,這里先介紹兩個概念:

        • 數(shù)據(jù)點(Point): 時序數(shù)據(jù)的數(shù)據(jù)點是一個包含 (Timestamp:int64, Value:float64) 的二元組。
        • 時間線(Series): 不同標簽(Label)的組合稱為不同的時間線,如
        series1: {"__name__": "netspeed", "host": "localhost", "iface": "eth0"}
        series2: {"__name__": "netspeed", "host": "localhost", "iface": "eth1"}

        Prometheus, InfluxDB, M3, TimescaleDB 都是時下流行的 TSDB。時序數(shù)據(jù)的壓縮算法很大程度上決定了 TSDB 的性能,以上幾個項目的實現(xiàn)都參考了 Fackbook 2015 年發(fā)表的論文《Gorilla: A fast, scalable, in-memory time series database》(http://www.vldb.org/pvldb/vol8/p1816-teller.pdf) 中提到的差值算法,該算法平均可以將 16 字節(jié)的數(shù)據(jù)點壓縮成 1.37 字節(jié)。

        Who's mando?

        Din Djarin, also known as "the Mandalorian" or simply "Mando," was a human male Mandalorian who worked as a famous bounty hunter during the New Republic Era.

        What's mandodb?

        mandodb(https://github.com/chenjiandongx/mandodb) 是我在學習過程中實現(xiàn)的一個最小化的 TSDB,從概念上來講它還算不上是一個完整的 TSDB,因為它:

        • 沒有實現(xiàn)自己的查詢引擎(實現(xiàn)難度大)
        • 缺少磁盤歸檔文件 Compact 操作(有空的話會實現(xiàn))
        • 沒有 WAL 作為災備保證高可用(心情好的話會實現(xiàn))

        mandodb 主要受到了兩個項目的啟發(fā)。本項目僅限于學習用途,未經(jīng)生產(chǎn)環(huán)境測試驗證!

        • nakabonne/tstorage
        • prometheus/prometheus

        prometheus 的核心開發(fā)者 Fabian Reinartz 寫了一篇文章 《Writing a Time Series Database from Scratch》(https://fabxc.org/tsdb/) 來介紹 prometheus TSDB 的演變過程,非常值得一讀,強烈推薦。

        ?? 數(shù)據(jù)模型 & API 文檔

        數(shù)據(jù)模型定義

        // Point 表示一個數(shù)據(jù)點 (ts, value) 二元組
        type Point struct {
         Ts    int64 // in seconds
         Value float64
        }

        // Label 代表一個標簽組合
        type Label struct {
         Name  string
         Value string
        }

        // Row 一行時序數(shù)據(jù) 包括數(shù)據(jù)點和標簽組合
        type Row struct {
         Metric string
         Labels LabelSet
         Point  Point
        }

        // LabelSet 表示 Label 組合
        type LabelSet []Label

        // LabelMatcher Label 匹配器 支持正則
        type LabelMatcher struct {
         Name   string
         Value  string
         IsRegx bool
        }

        // LabelMatcherSet 表示 LabelMatcher 組合
        type LabelMatcherSet []LabelMatcher

        API

        // InsertRows 寫數(shù)據(jù)
        InsertRows(rows []*Row) error 

        // QueryRange 查詢時序數(shù)據(jù)點
        QueryRange(metric string, lms LabelMatcherSet, start, end int64) ([]MetricRet, error)

        // QuerySeries 查詢時序序列組合
        QuerySeries(lms LabelMatcherSet, start, end int64) ([]map[string]string, error)

        // QueryLabelValues 查詢標簽值
        QueryLabelValues(label string, start, end int64) []string

        ?? 配置選項

        配置項在初始化 TSDB 的時候設(shè)置。

        // WithMetaSerializerType 設(shè)置 Metadata 數(shù)據(jù)的序列化類型
        // 目前只提供了 BinaryMetaSerializer
        WithMetaSerializerType(t MetaSerializerType) Option 

        // WithMetaBytesCompressorType 設(shè)置字節(jié)數(shù)據(jù)的壓縮算法
        // 目前提供了
        // * 不壓縮: NoopBytesCompressor(默認)
        // * ZSTD: ZstdBytesCompressor
        // * Snappy: SnappyBytesCompressor
        WithMetaBytesCompressorType(t BytesCompressorType) Option

        // WithOnlyMemoryMode 設(shè)置是否默認只存儲在內(nèi)存中
        // 默認為 false
        WithOnlyMemoryMode(memoryMode bool) Option

        // WithEnabledOutdated 設(shè)置是否支持亂序?qū)懭?nbsp;此特性會增加資源開銷 但會提升數(shù)據(jù)完整性
        // 默認為 true
        WithEnabledOutdated(outdated bool) Option

        // WithMaxRowsPerSegment 設(shè)置單 Segment 最大允許存儲的點數(shù)
        // 默認為 19960412(夾雜私貨 ??)
        WithMaxRowsPerSegment(n int64) Option

        // WithDataPath 設(shè)置 Segment 持久化存儲文件夾
        // 默認為 "."
        WithDataPath(d string) Option

        // WithRetention 設(shè)置 Segment 持久化數(shù)據(jù)保存時長
        // 默認為 7d
        WithRetention(t time.Duration) Option

        // WithWriteTimeout 設(shè)置寫入超時閾值
        // 默認為 30s
        WithWriteTimeout(t time.Duration) Option

        // WithLoggerConfig 設(shè)置日志配置項
        // logger: github.com/chenjiandongx/logger
        WithLoggerConfig(opt *logger.Options) Option

        ?? 用法示例

        package main

        import (
         "fmt"
         "time"

         "github.com/chenjiandongx/mandodb"
        )

        func main() {
         store := mandodb.OpenTSDB(
          mandodb.WithOnlyMemoryMode(true),
          mandodb.WithWriteTimeout(10*time.Second),
         )
         defer store.Close()

         // 插入數(shù)據(jù)
         _ = store.InsertRows([]*mandodb.Row{
          {
           Metric: "cpu.busy",
           Labels: []mandodb.Label{
            {Name: "node", Value: "vm1"},
            {Name: "dc", Value: "gz-idc"},
           },
           Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
          },
          {
           Metric: "cpu.busy",
           Labels: []mandodb.Label{
            {Name: "node", Value: "vm2"},
            {Name: "dc", Value: "sz-idc"},
           },
           Point: mandodb.Point{Ts: 1600000001, Value: 0.1},
          },
         })

         time.Sleep(time.Millisecond)

         // 時序數(shù)據(jù)查詢
         data, _ := store.QueryRange("cpu.busy"nil16000000001600000002)
         fmt.Printf("data: %+v\n", data)
         // output:
         // data: [{Labels:{__name__="cpu.busy", dc="gz-idc", node="vm1"} Points:[{Ts:1600000001 Value:0.1}]}]

         // 查詢 Series
         // __name__ 是 metric 名稱在 TSDB 中的 Label Key
         ser, _ := store.QuerySeries(
                mandodb.LabelMatcherSet{{Name: "__name__", Value: "cpu.busy"}}, 16000000001600000002)
         for _, d := range ser {
          fmt.Printf("data: %+v\n", d)
         }
         // output:
         // data: map[__name__:cpu.busy dc:gz-idc node:vm1]
         // data: map[__name__:cpu.busy dc:sz-idc node:vm2]

         // 查詢標簽值
         lvs := store.QueryLabelValues("node"16000000001600000002)
         fmt.Printf("data: %+v\n", lvs)
         // output:
         // data: [vm1 vm2]
        }

        下面是我對這段時間學習內(nèi)容的整理,嘗試完整介紹如何從零開始實現(xiàn)一個小型的 TSDB。

        我本身并沒有數(shù)據(jù)庫開發(fā)的背景,某些描述可能并不那么準確,所以歡迎 實名 diss 指正。

        ?? Gorilla 差值算法

        Gorilla 論文 4.1 小節(jié)介紹了壓縮算法,先整體看一下壓縮方案,T/V 是緊挨存儲的,'0'/'10'/'11' 表示控制位。

        Figure: Gorilla 壓縮算法

        Timestamp DOD 壓縮:

        在時序的場景中,每個時序點都有一個對應的 Timestamp,一條時序序列中相鄰數(shù)據(jù)點的間隔是有規(guī)律可循的。一般來講,監(jiān)控數(shù)據(jù)的采集都是會以固定的時間間隔進行的,所以就可以用差值來記錄時間間隔,更進一步,我們可以用差值的差值來記錄以此來減少存儲空間。

        t1: 1627401800; t2: 1627401810; t3: 1627401820; t4: 1627401830
        --------------------------------------------------------------
        // 差值:delta
        t1: 1627401800; (t2-t1)d1: 10; (t3-t2)d2: 10; (t4-t3)d3: 10
        --------------------------------------------------------------
        // 差值的差值:delta of delta
        t1: 1627401800; dod1: 0; dod2: 0; dod3: 0

        實際環(huán)境中當然不可能每個間隔都這么均勻,由于網(wǎng)絡(luò)延遲等其他原因,差值會有波動。

        Value XOR 壓縮:

        Figure: IEEE 浮點數(shù)以及 XOR 計算結(jié)果

        當兩個數(shù)據(jù)點數(shù)值值比較接近的話,通過異或操作計算出來的結(jié)果是比較相似的,利用這點就可以通過記錄前置零和后置零個數(shù)以及數(shù)值部分來達到壓縮空間的目的。

        下面通過算法實現(xiàn)來介紹,代碼來自項目 dgryski/go-tsz。代碼完全按照論文中給出的步驟來實現(xiàn)。

        // New 初始化 block 這里會將第一個原始時間戳寫入到 block 中
        func New(t0 uint32) *Series {
         s := Series{
          T0:      t0,
          leading: ^uint8(0),
         }

         s.bw.writeBits(uint64(t0), 32)
         return &s
        }

        // Push 負責寫入時序數(shù)據(jù)
        func (s *Series) Push(t uint32, v float64) {
         // ....
         // 如果是第一個數(shù)據(jù)點的話寫入原始數(shù)據(jù)后直接返回
         if s.t == 0 {
          s.t = t
          s.val = v
          s.tDelta = t - s.T0 // 實際上這里為 0

          // The block header stores the starting time stamp, t-1(前一個時間戳),
          // which is aligned to a two hour window; the first time
          // stamp, t0, in the block is stored as a delta from t?1 in 14 bits.
                
          // 用 14 個 bit 寫入時間戳差值
          s.bw.writeBits(uint64(s.tDelta), 14)
          // 原始數(shù)據(jù)點完整寫入
          s.bw.writeBits(math.Float64bits(v), 64)
          return
         }

         tDelta := t - s.t
         dod := int32(tDelta - s.tDelta) // 計算差值的差值 Detla of Delta

         // 下面開始就處理非第一個數(shù)據(jù)點的情況了
         switch {
          // If D is zero, then store a single ‘0’ bit
          // 如果是零的話 那直接用 '0' 一個字節(jié)就可以直接表示
         case dod == 0:
          s.bw.writeBit(zero)

          //  If D is between [-63, 64], store ‘10’ followed by the value (7 bits)
         case -63 <= dod && dod <= 64:
          s.bw.writeBits(0x022// 控制位 '10'
          s.bw.writeBits(uint64(dod), 7// 7bits 可以表示 [-63, 64] 的范圍

          // If D is between [-255, 256], store ‘110’ followed by the value (9 bits)
         case -255 <= dod && dod <= 256:
          s.bw.writeBits(0x063// 控制位 '110'
          s.bw.writeBits(uint64(dod), 9)

          // if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits)
         case -2047 <= dod && dod <= 2048:
          s.bw.writeBits(0x0e4// 控制位 '1110'
          s.bw.writeBits(uint64(dod), 12)

          // Otherwise store ‘1111’ followed by D using 32 bits
         default:
          s.bw.writeBits(0x0f4// 其余情況控制位均用 '1111'
          s.bw.writeBits(uint64(dod), 32)
         }

         // 到這里 (T, V) 中的時間戳已經(jīng)寫入完畢了 接下來是寫 V 部分

         // 先計算兩個值的異或結(jié)果
         vDelta := math.Float64bits(v) ^ math.Float64bits(s.val)

         // If XOR with the previous is zero (same value), store single ‘0’ bit
         // 如果前后兩個值相等的話 直接用 '0' 1 個 bit 就可以表示
         // 所以如果上報的時序數(shù)據(jù)是 1 或者 0 這種的話 占用的內(nèi)存會非常少

         // zero = '0'; one = '1'
         if vDelta == 0 {
          s.bw.writeBit(zero)
         } else {    // 非 0 情況那就要把控制位置為 1
          s.bw.writeBit(one)

          // 計算前置 0 和后置 0
          leading := uint8(bits.LeadingZeros64(vDelta))
          trailing := uint8(bits.TrailingZeros64(vDelta))

          // clamp number of leading zeros to avoid overflow when encoding
          if leading >= 32 {
           leading = 31
          }

          // (Control bit ‘0’) If the block of meaningful bits
          // falls within the block of previous meaningful bits,
          // i.e., there are at least as many leading zeros and
          // as many trailing zeros as with the previous value,
          // use that information for the block position and
          // just store the meaningful XORed value.

          // 如果前置 0 不小于上一個值計算的異或結(jié)果的前置 0 且后置 0 也不小于上一個值計算的異或結(jié)果的后置 0
          if s.leading != ^uint8(0) && leading >= s.leading && trailing >= s.trailing { // => 控制位 '10'
           s.bw.writeBit(zero)
           // 記錄異或值非零部分
           s.bw.writeBits(vDelta>>s.trailing, 64-int(s.leading)-int(s.trailing))
          } else { // => 控制位 '11'

           // (Control bit ‘1’) Store the length of the number
           // of leading zeros in the next 5 bits, then store the
           // length of the meaningful XORed value in the next
           // 6 bits. Finally store the meaningful bits of the XORed value.
           s.leading, s.trailing = leading, trailing

           // 其他情況控制位置為 1 并用接下來的 5bits 記錄前置 0 個數(shù)
           s.bw.writeBit(one)
           s.bw.writeBits(uint64(leading), 5)

           // 然后用接下來的 6bits 記錄異或差值中的非零部分
           sigbits := 64 - leading - trailing
           s.bw.writeBits(uint64(sigbits), 6)
           s.bw.writeBits(vDelta>>trailing, int(sigbits))
          }
         }

         // 狀態(tài)更新 至此(T, V)均已被壓縮寫入到內(nèi)存中
         s.tDelta = tDelta
         s.t = t
         s.val = v
        }

        // 每個 block 的結(jié)尾會使用特殊標記用于標識
        func finish(w *bstream) {
         // write an end-of-stream record
         w.writeBits(0x0f4)
         w.writeBits(0xffffffff32)
         w.writeBit(zero)
        }

        論文給出了不同 case 的 buckets 占比分布。

        Figure: Timestamp buckets distribution

        Figure: Value buckets distribution

        Timestamp buckets 中,前后兩個時間戳差值相同的比例高達 96.39%,而在 Value buckets 中只用一個控制位的占比也達到了 59.06%,可見其壓縮比之高。

        論文還給出了一個重要結(jié)論,數(shù)據(jù)壓縮比隨著時間的增長而增長,并在 120 個點的時候開始收斂到一個最佳值。

        Figure: 壓縮率曲線

        Gorilla 差值算法也應用于我的另外一個項目 chenjiandongx/tszlist,一種時序數(shù)據(jù)線程安全鏈表。

        ?? 數(shù)據(jù)寫入

        時序數(shù)據(jù)具有「垂直寫,水平查」的特性,即同一時刻有多條時間線的數(shù)據(jù)不斷被追加。但查詢的時候往往是查某條時間線持續(xù)一段時間內(nèi)的數(shù)據(jù)點。

        series
          ^   
          │   . . . . . . . . . . . . . . . . .   . . . . .   {__name__="request_total", method="GET"}
          │     . . . . . . . . . . . . . . . . . . . . . .   {__name__="request_total", method="POST"}
          │         . . . . . . .
          │       . . .     . . . . . . . . . . . . . . . .                  ... 
          │     . . . . . . . . . . . . . . . . .   . . . .   
          │     . . . . . . . . . .   . . . . . . . . . . .   {__name__="errors_total", method="POST"}
          │           . . .   . . . . . . . . .   . . . . .   {__name__="errors_total", method="GET"}
          │         . . . . . . . . .       . . . . .
          │       . . .     . . . . . . . . . . . . . . . .                  ... 
          │     . . . . . . . . . . . . . . . .   . . . . 
          v
            <-------------------- time --------------------->

        時序數(shù)據(jù)跟時間是強相關(guān)的(不然還叫時序數(shù)據(jù)???),即大多數(shù)查詢其實只會查詢最近時刻的數(shù)據(jù),這里的「最近」是個相對概念。所以沒必要維護一條時間線的完整生命周期,特別是在 Kubernetes 這種云原生場景,Pod 隨時有可能會被擴縮容,也就意味著一條時間線的生命周期可能會很短。如果我們一直記錄著所有的時間線的索引信息,那么隨著時間的推移,數(shù)據(jù)庫里的時間線的數(shù)量會呈現(xiàn)一個線性增長的趨勢 ??,會極大地影響查詢效率。

        這里引入一個概念「序列分流」,這個概念描述的是一組時間序列變得不活躍,即不再接收數(shù)據(jù)點,取而代之的是有一組新的活躍的序列出現(xiàn)的場景。

        series
          ^
          │   . . . . . .
          │   . . . . . .
          │   . . . . . .
          │               . . . . . . .
          │               . . . . . . .
          │               . . . . . . .
          │                             . . . . . .
          │                             . . . . . .
          │                                         . . . . .
          │                                         . . . . .
          │                                         . . . . .
          v
            <-------------------- time --------------------->

        我們將多條時間線的數(shù)據(jù)按一定的時間跨度切割成多個小塊,每個小塊本質(zhì)就是一個獨立小型的數(shù)據(jù)庫,這種做法另外一個優(yōu)勢是清除過期操作的時候非常方便,只要將整個塊給刪了就行 ??(梭哈是一種智慧)。內(nèi)存中保留最近兩個小時的熱數(shù)據(jù)(Memory Segment),其余數(shù)據(jù)持久化到磁盤(Disk Segment)。

        Figure: 序列分塊

        DiskSegment 使用的是 AVL Tree 實現(xiàn)的列表,可在插入時排序。為什么不用更加高大上的紅黑樹?因為不好實現(xiàn)...

        當 Memory Segment 達到歸檔條件的時候,會創(chuàng)建一個新的內(nèi)存塊并異步將剛歸檔的塊寫入到磁盤,同時會使用 mmap 將磁盤文件句柄映射到內(nèi)存中。代碼實現(xiàn)如下。

        func (tsdb *TSDB) getHeadPartition() (Segment, error) {
         tsdb.mut.Lock()
         defer tsdb.mut.Unlock()

         if tsdb.segs.head.Frozen() {
          head := tsdb.segs.head

          go func() {
           tsdb.wg.Add(1)
           defer tsdb.wg.Done()

           tsdb.segs.Add(head)

           t0 := time.Now()
           dn := dirname(head.MinTs(), head.MaxTs())

           if err := writeToDisk(head.(*memorySegment)); err != nil {
            logger.Errorf("failed to flush data to disk, %v", err)
            return
           }

           fname := path.Join(dn, "data")
           mf, err := mmap.OpenMmapFile(fname)
           if err != nil {
            logger.Errorf("failed to make a mmap file %s, %v", fname, err)
            return
           }

           tsdb.segs.Remove(head)
           tsdb.segs.Add(newDiskSegment(mf, dn, head.MinTs(), head.MaxTs()))
           logger.Infof("write file %s take: %v", fname, time.Since(t0))
          }()

          tsdb.segs.head = newMemorySegment()
         }

         return tsdb.segs.head, nil
        }

        Figure: Memory Segment 兩部分數(shù)據(jù)

        寫入的時候支持數(shù)據(jù)時間回撥,也就是支持有限的亂序數(shù)據(jù)寫入,實現(xiàn)方案是在內(nèi)存中對還沒歸檔的每條時間線維護一個鏈表(同樣使用 AVL Tree 實現(xiàn)),當數(shù)據(jù)點的時間戳不是遞增的時候存儲到鏈表中,查詢的時候會將兩部分數(shù)據(jù)合并查詢,持久化的時候也會將兩者合并寫入。

        ?? Mmap 內(nèi)存映射

        mmap 是一種將磁盤文件映射到進程的虛擬地址空間來實現(xiàn)對文件讀取和修改操作的技術(shù)。

        從 Linux 角度來看,操作系統(tǒng)的內(nèi)存空間被分為「內(nèi)核空間」和「用戶空間」兩大部分,其中內(nèi)核空間和用戶空間的空間大小、操作權(quán)限以及核心功能都不相同。這里的內(nèi)核空間是指操作系統(tǒng)本身使用的內(nèi)存空間,而用戶空間則是提供給各個進程使用的內(nèi)存空間。由于用戶進程不具有訪問內(nèi)核資源的權(quán)限,例如訪問硬件資源,因此當一個用戶進程需要使用內(nèi)核資源的時候,就需要通過 系統(tǒng)調(diào)用 來完成。

        虛擬內(nèi)存細節(jié)可以閱讀 《虛擬內(nèi)存精粹》 這篇文章。

        Figure: 常規(guī)文件操作和 mmap 操作的區(qū)別

        image

        常規(guī)文件操作

        讀文件: 用戶進程首先執(zhí)行 read(2) 系統(tǒng)調(diào)用,會進行系統(tǒng)上下文環(huán)境切換,從用戶態(tài)切換到內(nèi)核態(tài),之后由 DMA 將文件數(shù)據(jù)從磁盤讀取到內(nèi)核緩沖區(qū),再將內(nèi)核空間緩沖區(qū)的數(shù)據(jù)復制到用戶空間的緩沖區(qū)中,最后 read(2) 系統(tǒng)調(diào)用返回,進程從內(nèi)核態(tài)切換到用戶態(tài),整個過程結(jié)束。

        寫文件: 用戶進程發(fā)起 write(2) 系統(tǒng)調(diào)用,從用戶態(tài)切換到內(nèi)核態(tài),將數(shù)據(jù)從用戶空間緩沖區(qū)復制到內(nèi)核空間緩沖區(qū),接著 write(2) 系統(tǒng)調(diào)用返回,同時進程從內(nèi)核態(tài)切換到用戶態(tài),數(shù)據(jù)從內(nèi)核緩沖區(qū)寫入到磁盤,整個過程結(jié)束。

        mmap 操作

        mmap 內(nèi)存映射的實現(xiàn)過程,總的來說可以分為三個階段:

        1. 進程啟動映射過程,并在虛擬地址空間中為映射創(chuàng)建虛擬映射區(qū)域。
        2. 執(zhí)行內(nèi)核空間的系統(tǒng)調(diào)用函數(shù) mmap,建立文件物理地址和進程虛擬地址的一一映射關(guān)系。
        3. 進程發(fā)起對這片映射空間的訪問,引發(fā)缺頁異常,實現(xiàn)文件內(nèi)容到物理內(nèi)存的拷貝。

        ?? 小結(jié)

        常規(guī)文件操作為了提高讀寫效率和保護磁盤,使用了頁緩存機制。這樣造成讀文件時需要先將文件頁從磁盤拷貝到頁緩存中,由于頁緩存處在內(nèi)核空間,不能被用戶進程直接尋址,所以還需要將頁緩存中數(shù)據(jù)頁再次拷貝到內(nèi)存對應的用戶空間中。這樣,通過了兩次數(shù)據(jù)拷貝過程,才能完成進程對文件內(nèi)容的獲取任務。寫操作也是一樣,待寫入的 buffer 在內(nèi)核空間不能直接訪問,必須要先拷貝至內(nèi)核空間對應的主存,再寫回磁盤中(延遲寫回),也是需要兩次數(shù)據(jù)拷貝。

        而使用 mmap 操作文件,創(chuàng)建新的虛擬內(nèi)存區(qū)域和建立文件磁盤地址和虛擬內(nèi)存區(qū)域映射這兩步,沒有任何文件拷貝操作。而之后訪問數(shù)據(jù)時發(fā)現(xiàn)內(nèi)存中并無數(shù)據(jù)而發(fā)起的缺頁異常過程,可以通過已經(jīng)建立好的映射關(guān)系,只使用一次數(shù)據(jù)拷貝,就從磁盤中將數(shù)據(jù)傳入內(nèi)存的用戶空間中,供進程使用。

        ?? 總而言之,常規(guī)文件操作需要從磁盤到頁緩存再到用戶主存的兩次數(shù)據(jù)拷貝。而 mmap 操控文件只需要從磁盤到用戶主存的一次數(shù)據(jù)拷貝過程。mmap 的關(guān)鍵點是實現(xiàn)了「用戶空間」和「內(nèi)核空間」的數(shù)據(jù)直接交互而省去了不同空間數(shù)據(jù)復制的開銷

        ?? 索引設(shè)計

        TSDB 的查詢,是通過 Label 組合來鎖定到具體的時間線進而確定分塊偏移檢索出數(shù)據(jù)。

        • Sid(MetricHash/-/LabelHash) 是一個 Series 的唯一標識。
        • Label(Name/-/Value) => vm="node1"; vm="node2"; iface="eth0"。

        在傳統(tǒng)的關(guān)系型數(shù)據(jù)庫,索引設(shè)計可能是這樣的。

        Sid(主鍵)Label1Label2Label3Label4...LabelN
        sid1×××
        ...×
        sid2
        ×××...×
        sid3××
        ×...×
        sid4×
        ××...×

        時序數(shù)據(jù)是 NoSchema 的,沒辦法提前建表和定義數(shù)據(jù)模型 ??,因為我們要支持用戶上報任意 Label 組合的數(shù)據(jù),這樣的話就沒辦法進行動態(tài)的擴展了?;蛟S你會靈光一現(xiàn) ?,既然這樣,那把 Labels 放一個字段拼接起來不就可以無限擴展啦,比如下面這個樣子。

        Sid(主鍵)Labels
        sid1label1, label2, label3, ...
        sid2label2, label3, label5, ...
        sid3label4, label6, label9, ...
        sid4label2, label3, label7, ...

        喲嚯,乍一看沒毛病,靚仔竊喜。

        不對,有問題 ??,要定位到其中的某條時間線,那我是不是得全表掃描一趟。而且這種設(shè)計還有另外一個弊病,就是會導致內(nèi)存激增,Label 的 Name 和 Value 都可能是特別長的字符串。

        那怎么辦呢(?? 靚仔沉默...),剎那間我的腦中閃過一個帥氣的身影,沒錯,就是你,花澤類「只要倒立眼淚就不會流出來」。

        我悟了!要學會逆向思維 ??,把 Label 當做主鍵,Sid 當做其字段不就好了。這其實有點類似于 ElasticSearch 中的倒排索引,主鍵為 Keyword,字段為 DocumentID。索引設(shè)計如下。

        Label(主鍵)Sids
        label1: {vm="node1"}sid1, sid2, sid3, ...
        label2: {vm="node2"}sid2, sid3, sid5, ...
        label3: {iface="eth0"}sid3, sid5, sid9, ...
        label4: {iface="eth1"}sid2, sid3, sid7, ...

        Label 作為主鍵時會建立索引(Hashkey),查找的效率可視為 O(1),再根據(jù)鎖定的 Label 來最終確定想要的 Sid。舉個例子,我們想要查找 {vm="node1", iface="eth0"} 的時間線的話就可以快速定位到 Sids(忽略其他 ... sid)。

        sid1; sid2; sid3
        sid2; sid3; sid5

        兩者求一個交集,就可以得到最終要查詢的 Sid 為 sid2sid3。?? Nice!

        假設(shè)我們的查詢只支持相等匹配的話,格局明顯就小了 ??。查詢條件是 {vm=~"node*", iface="eth0"} 腫么辦?對 label1、label2、label3 和 label4 一起求一個并集嗎?顯然不是,因為這樣算的話那結(jié)果就是 sid3。

        厘清關(guān)系就不難看出,只要對相同的 Label Name 做并集然后再對不同的 Label Name 求交集就可以了。這樣算的正確結(jié)果就是 sid3sid5。實現(xiàn)的時候用到了 Roaring Bitmap,一種優(yōu)化的位圖算法。

        Memory Segment 索引匹配

        func (mim *memoryIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []string {
         // ...
         sids := newMemorySidSet()
         var got bool
         for i := len(lms) - 1; i >= 0; i-- {
          tmp := newMemorySidSet()
          vs := lvs.Match(lms[i])
          // 對相同的 Label Name 求并集
          for _, v := range vs {
           midx := mim.idx[joinSeparator(lms[i].Name, v)]
           if midx == nil || midx.Size() <= 0 {
            continue
           }

           tmp.Union(midx.Copy())
          }

          if tmp == nil || tmp.Size() <= 0 {
           return nil
          }

          if !got {
           sids = tmp
           got = true
           continue
          }

          // 對不同的 Label Name 求交集
          sids.Intersection(tmp.Copy())
         }

         return sids.List()
        }

        Disk Segment 索引匹配

        func (dim *diskIndexMap) MatchSids(lvs *labelValueSet, lms LabelMatcherSet) []uint32 {
         // ...

         lst := make([]*roaring.Bitmap, 0)
         for i := len(lms) - 1; i >= 0; i-- {
          tmp := make([]*roaring.Bitmap, 0)
          vs := lvs.Match(lms[i])

          // 對相同的 Label Name 求并集
          for _, v := range vs {
           didx := dim.label2sids[joinSeparator(lms[i].Name, v)]
           if didx == nil || didx.set.IsEmpty() {
            continue
           }

           tmp = append(tmp, didx.set)
          }

          union := roaring.ParOr(4, tmp...)
          if union.IsEmpty() {
           return nil
          }

          lst = append(lst, union)
         }

         // 對不同的 Label Name 求交集
         return roaring.ParAnd(4, lst...).ToArray()
        }

        然而,確定相同的 LabelName 也是一個問題,因為 Label 本身就代表著 Name:Value,難不成我還要遍歷所有 label 才能確定嘛,這不就又成了全表掃描???

        沒有什么問題是一個索引解決不了的,如果有,那就再增加一個索引。--- 魯迅。

        只要我們保存 Label 的 Name 對應的 Value 列表的映射關(guān)系即可高效解決這個問題。

        LabelNameLabelValue
        vmnode1, node2, ...
        ifaceeth0, eth1, ...

        還是上面的 {vm=~"node1|node2", iface="eth0"} 查詢,第一步通過正則匹配確定匹配到 node1, node2,第二步匹配到 eth0,再將 LabelName 和 LabelValue 一拼裝,Label 就出來了,?? 完事!

        橋豆麻袋!還有一個精彩的正則匹配優(yōu)化算法沒介紹。

        fastRegexMatcher 是一種優(yōu)化的正則匹配器,算法來自 Prometheus。

        // 思路就是盡量先執(zhí)行前綴匹配和后綴匹配 能不用正則就不用正則
        // 如 label 表達式為 {vm="node*"}
        // 而我們此時內(nèi)存中有 vm=node1, vm=node2, vm=foo, vm=bar,那這個時候只需要前綴匹配就能直接把 vm=foo,vm=bar 給過濾了
        // 畢竟前綴匹配和后綴匹配的執(zhí)行效率還是比正則高不少的
        type fastRegexMatcher struct {
         re       *regexp.Regexp
         prefix   string
         suffix   string
         contains string
        }

        func newFastRegexMatcher(v string) (*fastRegexMatcher, error) {
         re, err := regexp.Compile("^(?:" + v + ")$")
         if err != nil {
          return nil, err
         }

         parsed, err := syntax.Parse(v, syntax.Perl)
         if err != nil {
          return nil, err
         }

         m := &fastRegexMatcher{
          re: re,
         }

         if parsed.Op == syntax.OpConcat {
          m.prefix, m.suffix, m.contains = optimizeConcatRegex(parsed)
         }

         return m, nil
        }

        // optimizeConcatRegex returns literal prefix/suffix text that can be safely
        // checked against the label value before running the regexp matcher.
        func optimizeConcatRegex(r *syntax.Regexp) (prefix, suffix, contains string) {
         sub := r.Sub

         // We can safely remove begin and end text matchers respectively
         // at the beginning and end of the regexp.
         if len(sub) > 0 && sub[0].Op == syntax.OpBeginText {
          sub = sub[1:]
         }
         if len(sub) > 0 && sub[len(sub)-1].Op == syntax.OpEndText {
          sub = sub[:len(sub)-1]
         }

         if len(sub) == 0 {
          return
         }

         // Given Prometheus regex matchers are always anchored to the begin/end
         // of the text, if the first/last operations are literals, we can safely
         // treat them as prefix/suffix.
         if sub[0].Op == syntax.OpLiteral && (sub[0].Flags&syntax.FoldCase) == 0 {
          prefix = string(sub[0].Rune)
         }
         if last := len(sub) - 1; sub[last].Op == syntax.OpLiteral && (sub[last].Flags&syntax.FoldCase) == 0 {
          suffix = string(sub[last].Rune)
         }

         // If contains any literal which is not a prefix/suffix, we keep the
         // 1st one. We do not keep the whole list of literals to simplify the
         // fast path.
         for i := 1; i < len(sub)-1; i++ {
          if sub[i].Op == syntax.OpLiteral && (sub[i].Flags&syntax.FoldCase) == 0 {
           contains = string(sub[i].Rune)
           break
          }
         }

         return
        }

        func (m *fastRegexMatcher) MatchString(s string) bool {
         if m.prefix != "" && !strings.HasPrefix(s, m.prefix) {
          return false
         }

         if m.suffix != "" && !strings.HasSuffix(s, m.suffix) {
          return false
         }

         if m.contains != "" && !strings.Contains(s, m.contains) {
          return false
         }
         return m.re.MatchString(s)
        }

        ?? 存儲布局

        既然是數(shù)據(jù)庫,那么自然少不了數(shù)據(jù)持久化的特性。了解完索引的設(shè)計,再看看落到磁盤的存儲布局就很清晰了。先跑個示例程序?qū)懭胍恍?shù)據(jù)熱熱身。

        package main

        import (
         "fmt"
         "math/rand"
         "strconv"
         "time"

         "github.com/chenjiandongx/mandodb"
         "github.com/satori/go.uuid"
        )

        // 模擬一些監(jiān)控指標
        var metrics = []string{
         "cpu.busy""cpu.load1""cpu.load5""cpu.load15""cpu.iowait",
         "disk.write.ops""disk.read.ops""disk.used",
         "net.in.bytes""net.out.bytes""net.in.packages""net.out.packages",
         "mem.used""mem.idle""mem.used.bytes""mem.total.bytes",
        }

        // 增加 Label 數(shù)量
        var uid1, uid2, uid3 []string

        func init() {
         for i := 0; i < len(metrics); i++ {
          uid1 = append(uid1, uuid.NewV4().String())
          uid2 = append(uid2, uuid.NewV4().String())
          uid3 = append(uid3, uuid.NewV4().String())
         }
        }

        func genPoints(ts int64, node, dc int) []*mandodb.Row {
         points := make([]*mandodb.Row, 0)
         for idx, metric := range metrics {
          points = append(points, &mandodb.Row{
           Metric: metric,
           Labels: []mandodb.Label{
            {Name: "node", Value: "vm" + strconv.Itoa(node)},
            {Name: "dc", Value: strconv.Itoa(dc)},
            {Name: "foo", Value: uid1[idx]},
            {Name: "bar", Value: uid2[idx]},
            {Name: "zoo", Value: uid3[idx]},
           },
           Point: mandodb.Point{Ts: ts, Value: float64(rand.Int31n(60))},
          })
         }

         return points
        }

        func main() {
         store := mandodb.OpenTSDB()
         defer store.Close()

         now := time.Now().Unix() - 36000 // 10h ago

         for i := 0; i < 720; i++ {
          for n := 0; n < 5; n++ {
           for j := 0; j < 1024; j++ {
            _ = store.InsertRows(genPoints(now, n, j))
           }
          }

          now += 60 //1min
         }

         fmt.Println("finished")

         select {}
        }

        每個分塊保存在名字為 seg-${mints}-${maxts} 文件夾里,每個文件夾含有 datameta.json 兩個文件。

        • data: 存儲了一個 Segment 的所有數(shù)據(jù),包括數(shù)據(jù)點和索引信息。
        • meta.json: 描述了分塊的時間線數(shù)量,數(shù)據(jù)點數(shù)量以及該塊的數(shù)據(jù)時間跨度。
        ? ?? tree -h seg-*
        seg-1627709713-1627716973
        ├── [ 28M]  data
        └── [ 110]  meta.json
        seg-1627716973-1627724233
        ├── [ 28M]  data
        └── [ 110]  meta.json
        seg-1627724233-1627731493
        ├── [ 28M]  data
        └── [ 110]  meta.json
        seg-1627731493-1627738753
        ├── [ 28M]  data
        └── [ 110]  meta.json
        seg-1627738753-1627746013
        ├── [ 28M]  data
        └── [ 110]  meta.json

        0 directories, 10 files

        ? ?? cat seg-1627709713-1627716973/meta.json -p
        {
            "seriesCount": 81920,
            "dataPointsCount": 9912336,
            "maxTs": 1627716973,
            "minTs": 1627709713
        }

        存儲 8 萬條時間線共接近 1 千萬的數(shù)據(jù)點的數(shù)據(jù)塊占用磁盤 28M。實際上在寫入的時候,一條數(shù)據(jù)是這個樣子的。

        {__name__="cpu.busy", node="vm0", dc="0", foo="bdac463d-8805-4cbe-bc9a-9bf495f87bab", bar="3689df1d-cbf3-4962-abea-6491861e62d2", zoo="9551010d-9726-4b3b-baf3-77e50655b950"1627710454 41

        這樣一條數(shù)據(jù)按照 JSON 格式進行網(wǎng)絡(luò)通信的話,大概是 200Byte,初略計算一下。

        200 * 9912336 = 1982467200Byte = 1890M

        可以選擇 ZSTD 或者 Snappy 算法進行二次壓縮(默認不開啟)。還是上面的示例代碼,不過在 TSDB 啟動的時候指定了壓縮算法。

        ZstdBytesCompressor

        func main() {
         store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.ZstdBytesCompressor))
         defer store.Close()
         // ...
        }

        // 壓縮效果 28M -> 25M
        ? ?? ll seg-1627711905-1627719165
        Permissions Size User          Date Modified Name
        .rwxr-xr-x   25M chenjiandongx  1 Aug 00:13  data
        .rwxr-xr-x   110 chenjiandongx  1 Aug 00:13  meta.json

        SnappyBytesCompressor

        func main() {
         store := mandodb.OpenTSDB(mandodb.WithMetaBytesCompressorType(mandodb.SnappyBytesCompressor))
         defer store.Close()
         // ...
        }

        // 壓縮效果 28M -> 26M
        ? ?? ll seg-1627763918-1627771178
        Permissions Size User          Date Modified Name
        .rwxr-xr-x   26M chenjiandongx  1 Aug 14:39  data
        .rwxr-xr-x   110 chenjiandongx  1 Aug 14:39  meta.json

        多多少少還是有點效果的 ??...

        壓縮是有成本的,壓縮體積的同時會增大 CPU 開銷(mbp 可以煎雞蛋了),減緩寫入速率。

        敲黑板,接下來就要來好好講講 data 文件到底寫了什么東西。 data 存儲布局如下。

        Figure: Segment Stroage

        TOC 描述了 Data Block 和 Meta Block(Series Block + Labels Block)的體積,用于后面對 data 進行解析讀取。Data Block 存儲了每條時間線具體的數(shù)據(jù)點,時間線之間數(shù)據(jù)緊挨存儲。DataContent 就是使用 Gorilla 差值算法壓縮的 block。

        Figure: Data Block

        Labels Block 記錄了具體的 Label 值以及對應 Label 與哪些 Series 相關(guān)聯(lián)。

        Figure: Labels Block

        Series Block 記錄了每條時間線的元數(shù)據(jù),字段解釋如下。

        • SidLength: Sid 的長度。
        • Sid: 時間線的唯一標識。
        • StartOffset: 時間線數(shù)據(jù)塊在 Data Block 中的起始偏移。
        • EndOffset: 時間線數(shù)據(jù)塊在 Data Block 中的終止偏移。
        • LabelCount: 時間線包含的 Label 數(shù)量。
        • Labels: 標簽在 Labels Block 中的序號(僅記錄序號,不記錄具體值)。

        Figure: Series Block

        了解完設(shè)計,再看看 Meta Block 編碼和解編碼的代碼實現(xiàn),binaryMetaSerializer 實現(xiàn)了 MetaSerializer 接口。

        type MetaSerializer interface {
         Marshal(Metadata) ([]byte, error)
         Unmarshal([]byte, *Metadata) error
        }

        編碼 Metadata

        const (
         endOfBlock uint16 = 0xffff
         uint16Size        = 2
         uint32Size        = 4
         uint64Size        = 8

         magic = "https://github.com/chenjiandongx/mandodb"
        )

        func (s *binaryMetaSerializer) Marshal(meta Metadata) ([]byte, error) {
         encf := newEncbuf()

         // labels block
         labelOrdered := make(map[string]int)
         for idx, row := range meta.Labels {
          labelOrdered[row.Name] = idx
          encf.MarshalUint16(uint16(len(row.Name)))
          encf.MarshalString(row.Name)
          encf.MarshalUint32(uint32(len(row.Sids)))
          encf.MarshalUint32(row.Sids...)
         }
         encf.MarshalUint16(endOfBlock)

         // series block
         for idx, series := range meta.Series {
          encf.MarshalUint16(uint16(len(series.Sid)))
          encf.MarshalString(series.Sid)
          encf.MarshalUint64(series.StartOffset, series.EndOffset)

          rl := meta.sidRelatedLabels[idx]
          encf.MarshalUint32(uint32(rl.Len()))

          lids := make([]uint320, rl.Len())
          for _, lb := range rl {
           lids = append(lids, uint32(labelOrdered[lb.MarshalName()]))
          }

          sort.Slice(lids, func(i, j int) bool {
           return lids[i] < lids[j]
          })
          encf.MarshalUint32(lids...)
         }
         encf.MarshalUint16(endOfBlock)

         encf.MarshalUint64(uint64(meta.MinTs))
         encf.MarshalUint64(uint64(meta.MaxTs))
         encf.MarshalString(magic)   // <-- magic here

         return ByteCompress(encf.Bytes()), nil
        }

        解碼 Metadata

        func (s *binaryMetaSerializer) Unmarshal(data []byte, meta *Metadata) error {
         data, err := ByteDecompress(data)
         if err != nil {
          return ErrInvalidSize
         }

         if len(data) < len(magic) {
          return ErrInvalidSize
         }

         decf := newDecbuf()
         // 檢驗數(shù)據(jù)完整性
         if decf.UnmarshalString(data[len(data)-len(magic):]) != magic {
          return ErrInvalidSize
         }

         // labels block
         offset := 0
         labels := make([]seriesWithLabel, 0)
         for {
          var labelName string
          labelLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
          offset += uint16Size

          if labelLen == endOfBlock {
           break
          }

          labelName = decf.UnmarshalString(data[offset : offset+int(labelLen)])
          offset += int(labelLen)
          sidCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
          offset += uint32Size

          sidLst := make([]uint32, sidCnt)
          for i := 0; i < int(sidCnt); i++ {
           sidLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
           offset += uint32Size
          }
          labels = append(labels, seriesWithLabel{Name: labelName, Sids: sidLst})
         }
         meta.Labels = labels

         // series block
         rows := make([]metaSeries, 0)
         for {
          series := metaSeries{}
          sidLen := decf.UnmarshalUint16(data[offset : offset+uint16Size])
          offset += uint16Size

          if sidLen == endOfBlock {
           break
          }

          series.Sid = decf.UnmarshalString(data[offset : offset+int(sidLen)])
          offset += int(sidLen)
          series.StartOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
          offset += uint64Size
          series.EndOffset = decf.UnmarshalUint64(data[offset : offset+uint64Size])
          offset += uint64Size
          labelCnt := decf.UnmarshalUint32(data[offset : offset+uint32Size])
          offset += uint32Size

          labelLst := make([]uint32, labelCnt)
          for i := 0; i < int(labelCnt); i++ {
           labelLst[i] = decf.UnmarshalUint32(data[offset : offset+uint32Size])
           offset += uint32Size
          }
          series.Labels = labelLst
          rows = append(rows, series)
         }
         meta.Series = rows

         meta.MinTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
         offset += uint64Size
         meta.MaxTs = int64(decf.UnmarshalUint64(data[offset : offset+uint64Size]))
         offset += uint64Size

         return decf.Err()
        }

        至此,對 mandodb 的索引和存儲整體設(shè)計是不是就了然于胸。?? 文檔較長,建議 Star 收藏,畢竟來都來了...

        項目地址:https://github.com/chenjiandongx/mandodb

        瀏覽 83
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            黄片大全 byrwqs41.xyz | 女刑警被扒掉乳罩和内衣内裤被绑 | 欧美日韩国产免费观看 | 美女久久久久久 | 一级性生活小说 | 男人把女人操到爽 | 红桃一区| 男男肉耽高h彩漫 | 久热这里只有精品视频免费观看 | 色婷婷激情综合 |