1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Go 第三方庫(kù)源碼分析:juju/ratelimit

        共 6371字,需瀏覽 13分鐘

         ·

        2021-09-13 20:14

        https://github.com/juju/ratelimit 是一個(gè)基于令牌桶算法的限流器:令牌桶就是想象有一個(gè)固定大小的桶,系統(tǒng)會(huì)以恒定速率向桶中放 Token,桶滿則暫時(shí)不放。漏桶算法和令牌桶算法的主要區(qū)別在于,"漏桶算法"能夠強(qiáng)行限制數(shù)據(jù)的傳輸速率(或請(qǐng)求頻率),而"令牌桶算法"在能夠限制數(shù)據(jù)的平均傳輸速率外,還允許某種程度的突發(fā)傳輸。


        首先看下如何使用:

        import "github.com/juju/ratelimit"
        var tokenBucket ratelimit.Bucket = nil
        func init() { // func NewBucket(fillInterval time.Duration, capacity int64) *Bucket // fillInterval令牌填充的時(shí)間間隔 // capacity令牌桶的最大容量 tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)}
        func Handler() { available := tokenBucket.TakeAvailable(1) if available <= 0 { // 限流處理 } // handling}

        下面看下源碼實(shí)現(xiàn),juju/ratelimit實(shí)現(xiàn)很簡(jiǎn)單,一共只有兩個(gè)源碼文件和一個(gè)測(cè)試文件:

        ratelimit.goratelimit_test.goreader.go

        下面我們分析下常用的這兩個(gè)接口的實(shí)現(xiàn):


        1,ratelimit.NewBucket

        傳入的兩個(gè)參數(shù)分別是產(chǎn)生令牌的的間隔和桶的容量

        func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {  return NewBucketWithClock(fillInterval, capacity, nil)}
        func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {  return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)}

        默認(rèn)一個(gè)間隔周期內(nèi)就產(chǎn)生一個(gè)token,如果是高并發(fā)情況下,可以通過(guò)參數(shù)quantum控制產(chǎn)生多個(gè)。第三個(gè)參數(shù)是一個(gè)clock  interface,主要是方便mock測(cè)試,如果傳nil用的就是realClock{}

        // Clock represents the passage of time in a way that// can be faked out for tests.type Clock interface {  // Now returns the current time.  Now() time.Time  // Sleep sleeps for at least the given duration.  Sleep(d time.Duration)}

        realClock是實(shí)現(xiàn)了上述接口的結(jié)構(gòu)體:

        // realClock implements Clock in terms of standard time functions.type realClock struct{}
        // Now implements Clock.Now by calling time.Now.func (realClock) Now() time.Time { return time.Now()}
        // Now implements Clock.Sleep by calling time.Sleep.func (realClock) Sleep(d time.Duration) { time.Sleep(d)}

        上面幾個(gè)函數(shù)僅僅是對(duì)這個(gè)函數(shù)的一個(gè)簡(jiǎn)單包裝,加上默認(rèn)參數(shù),方便一般場(chǎng)景的使用,最終都是調(diào)用了這個(gè)函數(shù)

        func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {  if clock == nil {    clock = realClock{}  }  if fillInterval <= 0 {    panic("token bucket fill interval is not > 0")  }  if capacity <= 0 {    panic("token bucket capacity is not > 0")  }  if quantum <= 0 {    panic("token bucket quantum is not > 0")  }  return &Bucket{    clock:           clock,    startTime:       clock.Now(),    latestTick:      0,    fillInterval:    fillInterval,    capacity:        capacity,    quantum:         quantum,    availableTokens: capacity,  }}

        出來(lái)參數(shù)檢驗(yàn)外,最后生成了結(jié)構(gòu)體Bucket的指針

        type Bucket struct {  clock Clock
        // startTime holds the moment when the bucket was // first created and ticks began. startTime time.Time
        // capacity holds the overall capacity of the bucket. capacity int64
        // quantum holds how many tokens are added on // each tick. quantum int64
        // fillInterval holds the interval between each tick. fillInterval time.Duration
        // mu guards the fields below it. mu sync.Mutex
        // availableTokens holds the number of available // tokens as of the associated latestTick. // It will be negative when there are consumers // waiting for tokens. availableTokens int64
        // latestTick holds the latest tick for which // we know the number of tokens in the bucket. latestTick int64}

        Bucket里面出了存儲(chǔ)初始化必要的參數(shù)外,多了兩個(gè)變量:

        availableTokens當(dāng)前可用的令牌數(shù)量

        latestTick從程序運(yùn)行到上一次訪問(wèn)的時(shí)候,一共產(chǎn)生了多少次計(jì)數(shù)(如果quantum等于1的話 ,就是一共產(chǎn)生的令牌數(shù)量


        2,TakeAvailable

        有一個(gè)參數(shù),每次取的token數(shù)量,一般是一個(gè),為了并發(fā)安全,一般會(huì)加鎖:

        func (tb *Bucket) TakeAvailable(count int64) int64 {  tb.mu.Lock()  defer tb.mu.Unlock()  return tb.takeAvailable(tb.clock.Now(), count)}

        調(diào)用了令牌桶計(jì)算的核心函數(shù)takeAvailable,第一個(gè)參數(shù)表示是當(dāng)前時(shí)間,用于計(jì)算一共產(chǎn)生了多少個(gè)token:

        func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {  if count <= 0 {    return 0  }  tb.adjustavailableTokens(tb.currentTick(now))  if tb.availableTokens <= 0 {    return 0  }  if count > tb.availableTokens {    count = tb.availableTokens  }  tb.availableTokens -= count  return count}

        其中tb.adjustavailableTokens(tb.currentTick(now))用于計(jì)算修改可用token數(shù)量availableTokens,如果availableTokens<=0,說(shuō)明限流了;如果輸入的count比availableTokens,我么最多只能獲取availableTokens個(gè)token,獲取后,我們把a(bǔ)vailableTokens減去已經(jīng)使用的token數(shù)量。

        func (tb *Bucket) currentTick(now time.Time) int64 {  return int64(now.Sub(tb.startTime) / tb.fillInterval)}

        計(jì)算出了從開(kāi)始運(yùn)行到,當(dāng)前時(shí)間內(nèi)時(shí)間一共跳變了多少次,也就是一共產(chǎn)生了多少次令牌。

        func (tb *Bucket) adjustavailableTokens(tick int64) {  lastTick := tb.latestTick  tb.latestTick = tick  if tb.availableTokens >= tb.capacity {    return  }  tb.availableTokens += (tick - lastTick) * tb.quantum  if tb.availableTokens > tb.capacity {    tb.availableTokens = tb.capacity  }  return}

        1,如果可用token數(shù)量大于等于令牌桶的容量,說(shuō)明很長(zhǎng)時(shí)間沒(méi)有流量來(lái)獲取token了,不用處理。

        2,計(jì)算上一次獲取token 到現(xiàn)時(shí)刻,產(chǎn)生的token數(shù)量,把它加到availableTokens上

        3,如果availableTokens數(shù)量比capacity大,說(shuō)明溢出了,修改availableTokens為capacity。


        以上就是令牌桶算法的核心邏輯。當(dāng)然,這個(gè)包還封裝了一些其他的靈活的取令牌的接口,比如

        func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {  tb.mu.Lock()  defer tb.mu.Unlock()  return tb.take(tb.clock.Now(), count, maxWait)}

        這個(gè)函數(shù)就是獲取,在maxWait time.Duration超時(shí)的前提下,產(chǎn)生count個(gè)token,需要等待的時(shí)間間隔。

        func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {  if count <= 0 {    return 0, true  }
        tick := tb.currentTick(now) tb.adjustavailableTokens(tick) avail := tb.availableTokens - count if avail >= 0 { tb.availableTokens = avail return 0, true } // Round up the missing tokens to the nearest multiple // of quantum - the tokens won't be available until // that tick.
        // endTick holds the tick when all the requested tokens will // become available. endTick := tick + (-avail+tb.quantum-1)/tb.quantum endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) waitTime := endTime.Sub(now) if waitTime > maxWait { return 0, false } tb.availableTokens = avail return waitTime, true}

        函數(shù)的前半部分和takeAvailable一模一樣,后面邏輯表示,如果令牌不夠的情況下:

        1,計(jì)算還缺多少個(gè)令牌

        2,計(jì)算缺這么多令牌需要跳變多少次

        3,計(jì)算跳變這些次數(shù)需要的時(shí)間

        4,判斷需要的時(shí)間是否超時(shí)

        還有一個(gè)wait接口,用來(lái)計(jì)算,獲取count個(gè)令牌需要的時(shí)間,然后sleep這么長(zhǎng)時(shí)間。

        func (tb *Bucket) Wait(count int64) {  if d := tb.Take(count); d > 0 {    tb.clock.Sleep(d)  }}

        以上就是令牌桶算法的核心源碼實(shí)現(xiàn),

        ratelimit/reader.go

        里面實(shí)現(xiàn)了基于上述限流器實(shí)現(xiàn)的讀限速和寫(xiě)限速,原理是通過(guò)讀寫(xiě)buff的長(zhǎng)度來(lái)控制Wait函數(shù)的等待時(shí)間,實(shí)現(xiàn)讀寫(xiě)限速的

        func (r *reader) Read(buf []byte) (int, error) {  n, err := r.r.Read(buf)  if n <= 0 {    return n, err  }  r.bucket.Wait(int64(n))  return n, err}


        推薦閱讀


        福利

        我為大家整理了一份從入門(mén)到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門(mén)看什么,進(jìn)階看什么。關(guān)注公眾號(hào) 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬(wàn) Gopher 交流學(xué)習(xí)。

        瀏覽 40
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            97超级碰碰碰碰久久久久 | 一级片免费试看 | 亚洲黄色电影视频 | 扒下她的小内裤揉弄当众羞辱 | 最新国产亚洲免费在线视频 | 丰满的已婚女邻居hd | 人人看人人摸人人透 | 极品少妇馒头泬99 | 91精品婷婷国产综合久久蝌蚪 | 日本啪啪一级视频 |