手摸手Go 也談sync.WaitGroup
最近因?yàn)楣ぷ魃系氖虑楦聲鄬τ悬c(diǎn)兒慢,這周末又加了天班。然后昨天好好休息了下,順便翻了下《云雀叫了一整天》,看到一首小詩覺得不錯(cuò)分享給大家。
從前慢
木心
記得早先少年時(shí)
大家誠誠懇懇
說一句是一句
清早上火車站
長街黑暗無行人
賣豆?jié){的小店冒著熱氣
從前的日色變得慢
車,馬,郵件都慢
一生只夠愛一個(gè)人
從前的鎖也好看
鑰匙精美有樣子
你鎖了人家就懂了
小小一詩,人生盡在其中。
好了回歸正題,Go sync包目前只剩sync.WaitGroup沒分析了,今天起個(gè)大早趕上這一篇。
sync.WaitGroup是Go提供的一種允許一個(gè)goroutine等待一組goroutine完成任務(wù)的機(jī)制,類似于Java中的CountDownLatch。主goroutine調(diào)用Add方法設(shè)置需要等待的goroutine的數(shù)量,每個(gè)goroutine完成時(shí)調(diào)用Done方法。與此同時(shí),wait方法用于阻塞主goroutine直到所有其他goroutine執(zhí)行完畢。
基本使用
利用sync.WaitGroup完成一個(gè)多協(xié)程任務(wù)
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
fmt.Println(fmt.Sprintf("I'm finish my work at %s", time.Now().Format("2006-01-02 15:04:05")))
wg.Done()
}()
}
wg.Wait()
}
sync.WaitGroup源碼分析
數(shù)據(jù)結(jié)構(gòu)
// 第一次使用后不允許被拷貝
type WaitGroup struct {
noCopy noCopy
//64位值: 高32位為計(jì)數(shù)器 低32位為等待計(jì)數(shù)
//64位原子操作要求64位對齊,但是32位編譯器無法保證這一點(diǎn)。所以我們分配了12字節(jié),
//其中對齊的8字節(jié)作存儲state 剩下的4字節(jié)存儲sema
state1 [3]uint32
}
sync.WaitGroup結(jié)構(gòu)比較簡單,只包含一個(gè)防止拷貝的noCopy字段和一個(gè)長度為3的uint32數(shù)組。其核心在于對state1這個(gè)字段的操作,其字段含義體現(xiàn)在state()方法:
// 從wg.state1返回指向state和sema的指針
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
可見state1字段存儲了statep和semap,至于if-else的邏輯是因?yàn)樵硬僮餍枰獢?shù)據(jù)8字節(jié)對齊,否則程序會panic。故而WaitGroup會選擇使用uintptr(unsafe.Pointer(&wg.state1))%8 == 0先判斷是否是8字節(jié)對齊,如果不是則拿4個(gè)字節(jié)做下padding。(因?yàn)槟壳按蠖鄶?shù)平臺CPU字長都是8或4字節(jié))關(guān)于內(nèi)存對齊,如果你還不清楚,那你一定是沒讀過之前那篇《手摸手Go 你的內(nèi)存對齊了嗎》

字段state1剝離出了statep和semap
statep表示當(dāng)前WaitGroup當(dāng)前的狀態(tài),它的高32位為counter表示計(jì)數(shù)器,低32位waiters表示W(wǎng)ait等待的goroutine數(shù)量semap表示信號量,調(diào)用Wait的goroutine會被阻塞到這個(gè)信號量上

其核心邏輯如上圖,接下來看源碼
操作方法
WaitGroup提供了三個(gè)方法
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Add 負(fù)責(zé)修改counter值以及釋放阻塞在semp信號量的goroutine Done通過調(diào)用Add遞減counter值 Wait阻塞等待counter值為0為止
Add
Add操作入?yún)?code style="font-size: 14px;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">delta可正可負(fù),根據(jù)delta值更新counter。
當(dāng)counter為0時(shí),所有等待時(shí)阻塞的goroutine會被釋放 如果counter為負(fù)數(shù) 則Add會發(fā)生panic
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
//累加計(jì)數(shù)器
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) //計(jì)數(shù)器
w := uint32(state) //等待的goroutine數(shù)量
if v < 0 {//counter不能為負(fù)數(shù)
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// Add不得與Wait同時(shí)進(jìn)行
// 如果看到counter==0,則Wait不會增加等待者數(shù)量
// 仍然進(jìn)行廉價(jià)的完整性檢查以檢測WaitGroup的濫用
if *statep != state { //表明Add和Wait方法同時(shí)調(diào)用了
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 重置waiters count 為 0.
*statep = 0
for ; w != 0; w-- {
//喚醒阻塞在semap上的goroutine
runtime_Semrelease(semap, false, 0)
}
}
*statep!=state到這個(gè)檢查點(diǎn)一定是counter==0并且waiters>0,且*statep!=state就panic,表明sync.WaitGroup不允許在waiters>0未執(zhí)行完Wait方法過程中調(diào)用Add()或Wait()方法修改statep。
總結(jié)來說:當(dāng)counter為零時(shí)delta為正數(shù)的調(diào)用必須在wait方法調(diào)用之前發(fā)生。當(dāng)counter大于零時(shí)delta為正數(shù)或負(fù)數(shù)時(shí)的調(diào)用 隨時(shí)都可能發(fā)生。通常,這意味著對Add的調(diào)用應(yīng)該在創(chuàng)建goroutine或要等待的其他事件的語句之前執(zhí)行。如果使用WaitGroup來等待幾個(gè)獨(dú)立的事件集,則必須在所有先前的Wait調(diào)用返回之后再進(jìn)行新的Add調(diào)用。
Done
通過調(diào)用Add(-1),遞減counter值
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait
Wait 會一直阻塞到WaitGroup的counter為0為止
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// counter為0 則不需要等待
return
}
// 增加等待者數(shù)量
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Wait方法先是從state1中獲取statep和semap進(jìn)入一個(gè)for的無限循環(huán), atomic.LoadUint64加載statep,從而獲取高32位的counter和低32位的waiters。如果 counter==0,表示無需等待 直接返回如果 counter!=0,嘗試將semap+1,如果失敗則回到步驟一繼續(xù)執(zhí)行如果 atomic.CompareAndSwapUint64(statep, state, state+1)成功,則調(diào)用runtime_Semacquire(semap)將當(dāng)前goroutine阻塞在信號量semap上。檢查 *statep != 0則表明Wait方法未執(zhí)行完畢前,WaitGroup又被復(fù)用了,此時(shí)會panic。
總結(jié)
WaitGroup源碼還是比較簡單的,通過原子操作state1和信號量來協(xié)調(diào)goroutine工作。其中state1的設(shè)計(jì)也可以說是內(nèi)存對齊的一個(gè)最佳實(shí)踐。通過閱讀源碼也掌握了使用WaitGroup的正確姿勢:
Add()和Done()均可修改WaitGroup的計(jì)數(shù)數(shù),但是要保證計(jì)數(shù)不會修改為負(fù)數(shù),否則會發(fā)生panicWait()方法必須等待全部Add()方法調(diào)用完畢之后再調(diào)用,否則也可能導(dǎo)致panicWaitGroup是可以重復(fù)使用的。但前提是上一次的goroutine都調(diào)用Wait完畢后才能繼續(xù)復(fù)用。
