1. 手摸手Go 也談sync.WaitGroup

        共 6012字,需瀏覽 13分鐘

         ·

        2021-03-17 19:37

        最近因?yàn)楣ぷ魃系氖虑楦聲鄬τ悬c(diǎn)兒慢,這周末又加了天班。然后昨天好好休息了下,順便翻了下《云雀叫了一整天》,看到一首小詩覺得不錯(cuò)分享給大家。

        從前慢

        木心

        記得早先少年時(shí)


        大家誠誠懇懇

        說一句是一句


        清早上火車站

        長街黑暗無行人

        賣豆?jié){的小店冒著熱氣


        從前的日色變得慢

        車,馬,郵件都慢

        一生只夠愛一個(gè)人


        從前的鎖也好看

        鑰匙精美有樣子

        你鎖了人家就懂了

        小小一詩,人生盡在其中。

        好了回歸正題,Go sync包目前只剩sync.WaitGroup沒分析了,今天起個(gè)大早趕上這一篇。

        sync.WaitGroup是Go提供的一種允許一個(gè)goroutine等待一組goroutine完成任務(wù)的機(jī)制,類似于Java中的CountDownLatch。主goroutine調(diào)用Add方法設(shè)置需要等待的goroutine的數(shù)量,每個(gè)goroutine完成時(shí)調(diào)用Done方法。與此同時(shí),wait方法用于阻塞主goroutine直到所有其他goroutine執(zhí)行完畢。

        基本使用

        利用sync.WaitGroup完成一個(gè)多協(xié)程任務(wù)

        package main

        import (
         "fmt"
         "math/rand"
         "sync"
         "time"
        )

        func init() {
         rand.Seed(time.Now().Unix())
        }
        func main() {
         wg := sync.WaitGroup{}
         for i := 0; i < 3; i++ {
          wg.Add(1)
          go func() {
           time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
           fmt.Println(fmt.Sprintf("I'm finish my work at %s", time.Now().Format("2006-01-02 15:04:05")))
           wg.Done()
          }()
         }
         wg.Wait()
        }

        sync.WaitGroup源碼分析

        數(shù)據(jù)結(jié)構(gòu)

        // 第一次使用后不允許被拷貝
        type WaitGroup struct {
         noCopy noCopy
          //64位值: 高32位為計(jì)數(shù)器 低32位為等待計(jì)數(shù)
          //64位原子操作要求64位對齊,但是32位編譯器無法保證這一點(diǎn)。所以我們分配了12字節(jié),
          //其中對齊的8字節(jié)作存儲state 剩下的4字節(jié)存儲sema
         state1 [3]uint32
        }

        sync.WaitGroup結(jié)構(gòu)比較簡單,只包含一個(gè)防止拷貝的noCopy字段和一個(gè)長度為3的uint32數(shù)組。其核心在于對state1這個(gè)字段的操作,其字段含義體現(xiàn)在state()方法:

        // 從wg.state1返回指向state和sema的指針
        func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
         if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
          return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
         } else {
          return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
         }
        }

        可見state1字段存儲了statepsemap,至于if-else的邏輯是因?yàn)樵硬僮餍枰獢?shù)據(jù)8字節(jié)對齊,否則程序會panic。故而WaitGroup會選擇使用uintptr(unsafe.Pointer(&wg.state1))%8 == 0先判斷是否是8字節(jié)對齊,如果不是則拿4個(gè)字節(jié)做下padding。(因?yàn)槟壳按蠖鄶?shù)平臺CPU字長都是8或4字節(jié))關(guān)于內(nèi)存對齊,如果你還不清楚,那你一定是沒讀過之前那篇《手摸手Go 你的內(nèi)存對齊了嗎

        state1

        字段state1剝離出了statepsemap

        • statep 表示當(dāng)前WaitGroup當(dāng)前的狀態(tài),它的高32位為counter表示計(jì)數(shù)器,低32位waiters表示W(wǎng)ait等待的goroutine數(shù)量
        • semap表示信號量,調(diào)用Waitgoroutine會被阻塞到這個(gè)信號量上

        waitgroup

        其核心邏輯如上圖,接下來看源碼

        操作方法

        WaitGroup提供了三個(gè)方法

        func (wg *WaitGroup) Add(delta int)
        func (wg *WaitGroup) Done()
        func (wg *WaitGroup) Wait() 
        • Add 負(fù)責(zé)修改counter值以及釋放阻塞在semp信號量的goroutine
        • Done通過調(diào)用Add遞減counter值
        • Wait阻塞等待counter值為0為止

        Add

        Add操作入?yún)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">delta可正可負(fù),根據(jù)delta值更新counter。

        • 當(dāng)counter為0時(shí),所有等待時(shí)阻塞的goroutine會被釋放
        • 如果counter為負(fù)數(shù) 則Add會發(fā)生panic
        func (wg *WaitGroup) Add(delta int) {
         statep, semap := wg.state()
          //累加計(jì)數(shù)器
         state := atomic.AddUint64(statep, uint64(delta)<<32)
         v := int32(state >> 32//計(jì)數(shù)器
         w := uint32(state) //等待的goroutine數(shù)量

         if v < 0 {//counter不能為負(fù)數(shù)
          panic("sync: negative WaitGroup counter")
         }
         if w != 0 && delta > 0 && v == int32(delta) {
          panic("sync: WaitGroup misuse: Add called concurrently with Wait")
         }
         if v > 0 || w == 0 {
          return
         }
          // Add不得與Wait同時(shí)進(jìn)行
          // 如果看到counter==0,則Wait不會增加等待者數(shù)量
          // 仍然進(jìn)行廉價(jià)的完整性檢查以檢測WaitGroup的濫用
         if *statep != state { //表明Add和Wait方法同時(shí)調(diào)用了
          panic("sync: WaitGroup misuse: Add called concurrently with Wait")
         }
         // 重置waiters count 為 0.
         *statep = 0
         for ; w != 0; w-- {
            //喚醒阻塞在semap上的goroutine
          runtime_Semrelease(semap, false0)
         }
        }

        *statep!=state到這個(gè)檢查點(diǎn)一定是counter==0并且waiters>0,且*statep!=state就panic,表明sync.WaitGroup不允許在waiters>0未執(zhí)行完Wait方法過程中調(diào)用Add()Wait()方法修改statep。

        總結(jié)來說:當(dāng)counter為零時(shí)delta為正數(shù)的調(diào)用必須在wait方法調(diào)用之前發(fā)生。當(dāng)counter大于零時(shí)delta為正數(shù)或負(fù)數(shù)時(shí)的調(diào)用 隨時(shí)都可能發(fā)生。通常,這意味著對Add的調(diào)用應(yīng)該在創(chuàng)建goroutine或要等待的其他事件的語句之前執(zhí)行。如果使用WaitGroup來等待幾個(gè)獨(dú)立的事件集,則必須在所有先前的Wait調(diào)用返回之后再進(jìn)行新的Add調(diào)用。

        Done

        通過調(diào)用Add(-1),遞減counter值

        // Done decrements the WaitGroup counter by one.
        func (wg *WaitGroup) Done() {
         wg.Add(-1)
        }

        Wait

        Wait 會一直阻塞到WaitGroup的counter為0為止

        func (wg *WaitGroup) Wait() {
         statep, semap := wg.state()
         for {
          state := atomic.LoadUint64(statep)
          v := int32(state >> 32)
          w := uint32(state)
          if v == 0 {
           // counter為0 則不需要等待
           return
          }
          // 增加等待者數(shù)量
          if atomic.CompareAndSwapUint64(statep, state, state+1) {
           runtime_Semacquire(semap)
           if *statep != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
           }
           return
          }
         }
        }
        1. Wait方法先是從state1中獲取statepsemap
        2. 進(jìn)入一個(gè)for的無限循環(huán),atomic.LoadUint64加載statep,從而獲取高32位的counter和低32位的waiters。
        3. 如果counter==0,表示無需等待 直接返回
        4. 如果counter!=0,嘗試將semap+1,如果失敗則回到步驟一繼續(xù)執(zhí)行
        5. 如果atomic.CompareAndSwapUint64(statep, state, state+1)成功,則調(diào)用runtime_Semacquire(semap)將當(dāng)前goroutine阻塞在信號量semap上。
        6. 檢查*statep != 0則表明Wait方法未執(zhí)行完畢前,WaitGroup又被復(fù)用了,此時(shí)會panic。

        總結(jié)

        WaitGroup源碼還是比較簡單的,通過原子操作state1和信號量來協(xié)調(diào)goroutine工作。其中state1的設(shè)計(jì)也可以說是內(nèi)存對齊的一個(gè)最佳實(shí)踐。通過閱讀源碼也掌握了使用WaitGroup的正確姿勢:

        • Add()Done()均可修改WaitGroup的計(jì)數(shù)數(shù),但是要保證計(jì)數(shù)不會修改為負(fù)數(shù),否則會發(fā)生panic
        • Wait()方法必須等待全部Add()方法調(diào)用完畢之后再調(diào)用,否則也可能導(dǎo)致panic
        • WaitGroup是可以重復(fù)使用的。但前提是上一次的goroutine都調(diào)用Wait完畢后才能繼續(xù)復(fù)用。


        瀏覽 120
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. www.911国产 | 好湿好紧太爽了h黄蓉 | 亚洲淫香淫色 | 操逼的毛片 | 国产成人无码精品久在线观看 |