1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        最清晰易懂的 Go WaitGroup 剖析

        共 4742字,需瀏覽 10分鐘

         ·

        2021-04-10 10:20

        本篇主要介紹 WaitGroup 的一些特性,讓我們從本質(zhì)上去了解 WaitGroup。關(guān)于 WaitGroup 的基本用法這里就不做過多介紹了。相對于《這可能是最容易理解的 Go Mutex 源碼剖析》來說,WaitGroup 就簡單的太多了。

        源碼剖析

        Add()

        add

        Wait()

        wait
        type WaitGroup struct {
         noCopy noCopy
         state1 [3]uint32
        }

        WaitGroup 底層結(jié)構(gòu)看起來簡單,但 WaitGroup.state1 其實(shí)代表三個(gè)字段:counter,waiter,sema。

        • counter :可以理解為一個(gè)計(jì)數(shù)器,計(jì)算經(jīng)過 wg.Add(N), wg.Done() 后的值。
        • waiter :當(dāng)前等待 WaitGroup 任務(wù)結(jié)束的等待者數(shù)量。其實(shí)就是調(diào)用 wg.Wait() 的次數(shù),所以通常這個(gè)值是 1 。
        • sema :信號量,用來喚醒 Wait() 函數(shù)。

        為什么要將 counter 和 waiter 放在一起 ?

        其實(shí)是為了保證 WaitGroup 狀態(tài)的完整性。舉個(gè)例子,看下面的一段源碼

        // sync/waitgroup.go:L79 --> Add()
        if v > 0 || w == 0 { // v => counter, w => waiter
            return
        }
        // ...
        *statep = 0
        for ; w != 0; w-- {
            runtime_Semrelease(semap, false0)
        }

        當(dāng)同時(shí)發(fā)現(xiàn) wg.counter <= 0 && wg.waiter != 0 時(shí),才會去喚醒等待的 waiters,讓等待的協(xié)程繼續(xù)運(yùn)行。但是使用 WaitGroup 的調(diào)用方一般都是并發(fā)操作,如果不同時(shí)獲取的 counter 和 waiter 的話,就會造成獲取到的 counter 和 waiter 可能不匹配,造成程序 deadlock 或者程序提前結(jié)束等待。

        如何獲取 counter 和 waiter ?

        對于 wg.state 的狀態(tài)變更,WaitGroup 的 Add(),Wait() 是使用 atomic 來做原子計(jì)算的(為了避免鎖競爭)。但是由于 atomic 需要使用者保證其 64 位對齊,所以將 counter 和 waiter 都設(shè)置成 uint32,同時(shí)作為一個(gè)變量,即滿足了 atomic 的要求,同時(shí)也保證了獲取 waiter 和 counter 的狀態(tài)完整性。但這也就導(dǎo)致了 32位,64位機(jī)器上獲取 state 的方式并不相同。如下圖:簡單解釋下:

        因?yàn)?64 位機(jī)器上本身就能保證 64 位對齊,所以按照 64 位對齊來取數(shù)據(jù),拿到 state1[0], state1[1] 本身就是64 位對齊的。但是 32 位機(jī)器上并不能保證 64 位對齊,因?yàn)?32 位機(jī)器是 4 字節(jié)對齊,如果也按照 64 位機(jī)器取 state[0],state[1] 就有可能會造成 atmoic 的使用錯(cuò)誤。

        于是 32 位機(jī)器上空出第一個(gè) 32 位,也就使后面 64 位天然滿足 64 位對齊,第一個(gè) 32 位放入 sema 剛好合適。早期 WaitGroup 的實(shí)現(xiàn) sema 是和 state1 分開的,也就造成了使用 WaitGroup 就會造成 4 個(gè)字節(jié)浪費(fèi),不過 go1.11 之后就是現(xiàn)在的結(jié)構(gòu)了。

        為什么流程圖里缺少了 Done ?

        其實(shí)并不是,是因?yàn)?Done 的實(shí)現(xiàn)就是 Add. 只不過我們常規(guī)用法 wg.Add(1) 是加 1 ,wg.Done() 是減 1,即 wg.Done() 可以用 wg.Add(-1) 來代替。盡管我們知道 wg.Add 可以傳遞負(fù)數(shù)當(dāng) wg.Done  使用,但是還是別這么用。

        退出waitgroup的條件

        其實(shí)就一個(gè)條件, WaitGroup.counter 等于 0

        日常開發(fā)中特殊需求

        1. 控制超時(shí)/錯(cuò)誤控制

        雖說 WaitGroup 能夠讓主 Goroutine 等待子 Goroutine 退出,但是 WaitGroup 遇到一些特殊的需求,如:超時(shí),錯(cuò)誤控制,并不能很好的滿足,需要做一些特殊的處理。

        用戶在電商平臺中購買某個(gè)貨物,為了計(jì)算用戶能優(yōu)惠的金額,需要去獲取 A 系統(tǒng)(權(quán)益系統(tǒng)),B 系統(tǒng)(角色系統(tǒng)),C 系統(tǒng)(商品系統(tǒng)),D 系統(tǒng)(xx系統(tǒng))。為了提高程序性能,可能會同時(shí)發(fā)起多個(gè) Goroutine 去訪問這些系統(tǒng),必然會使用 WaitGroup 等待數(shù)據(jù)的返回,但是存在一些問題:

        1. 當(dāng)某個(gè)系統(tǒng)發(fā)生錯(cuò)誤,等待的 Goroutine 如何感知這些錯(cuò)誤?
        2. 當(dāng)某個(gè)系統(tǒng)響應(yīng)過慢,等待的 Goroutine 如何控制訪問超時(shí)?

        這些問題都是直接使用 WaitGroup 沒法處理的。如果直接使用 channel 配合 WaitGroup 來控制超時(shí)和錯(cuò)誤返回的話,封裝起來并不簡單,而且還容易出錯(cuò)。我們可以采用 ErrGroup 來代替 WaitGroup。

        有關(guān) ErrGroup 的用法這里就不再闡述。golang.org/x/sync/errgroup

        package main

        import (
         "context"
         "fmt"
         "golang.org/x/sync/errgroup"
         "time"
        )

        func main() {
         ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
         defer cancel()
         errGroup, newCtx := errgroup.WithContext(ctx)

         done := make(chan struct{})
         go func() {
          for i := 0; i < 10; i++ {
           errGroup.Go(func() error {
            time.Sleep(time.Second * 10)
            return nil
           })
          }
          if err := errGroup.Wait(); err != nil {
           fmt.Printf("do err:%v\n", err)
           return
          }
          done <- struct{}{}
         }()

         select {
         case <-newCtx.Done():
          fmt.Printf("err:%v ", newCtx.Err())
          return
         case <-done:
         }
         fmt.Println("success")
        }

        2. 控制 Goroutine 數(shù)量

        場景模擬:大概有 2000 - 3000 萬個(gè)數(shù)據(jù)需要處理,根據(jù)對服務(wù)器的測試,當(dāng)啟動(dòng) 200 個(gè) Goroutine 處理時(shí)性能最佳。如何控制?

        遇到諸如此類的問題時(shí),單純使用 WaitGroup 是不行的。既要保證所有的數(shù)據(jù)都能被處理,同時(shí)也要保證同時(shí)最多只有 200 個(gè) Goroutine。這種問題需要 WaitGroup 配合 Channel 一塊使用。

        package main

        import (
         "fmt"
         "sync"
         "time"
        )

        func main() {
         var wg = sync.WaitGroup{}
         manyDataList := []int{12345678910}
         ch := make(chan bool3)
         for _, v := range manyDataList {
          wg.Add(1)
          go func(data int) {
           defer wg.Done()

           ch <- true
           fmt.Printf("go func: %d, time: %d\n", data, time.Now().Unix())
           time.Sleep(time.Second)
           <-ch
          }(v)
         }
         wg.Wait()
        }

        使用注意點(diǎn)

        使用 WaitGroup 同樣不能被復(fù)制。具體例子就不再分析了。具體分析過程可以參見《這可能是最容易理解的 Go Mutex 源碼剖析》

        WaitGroup 的剖析到這里基本就結(jié)束了。有什么想跟我交流的,歡迎評論區(qū)留言。



        推薦閱讀


        福利

        我為大家整理了一份從入門到進(jìn)階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

        瀏覽 41
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            操逼挺好我看看 | 成人特级毛片全部免费播放 | 国产乱人伦无无码视频 | 双性宝宝你夹得我好难受h | 午夜精品毛片色情四级 | 亚洲see图 | 五月色综合 | 五月色在线播放 | 国产精品美女www爽爽爽动态图 | 国产激情AV在线 |