使用context、WaitGroup優(yōu)雅處理goroutine
via:
https://justbartek.ca/p/golang-context-wg-go-routines/
作者:Bartek
今天給大家分享一篇 如何使用 context、waitGroup 實(shí)現(xiàn)程序快速且優(yōu)雅退出 的文章!
原文如下:
最近,我正在編寫一個“滴答器”的應(yīng)用程序,每次“滴答”時可能會產(chǎn)生數(shù)千的 goroutine。我想確保當(dāng)應(yīng)用終止時,即使有一些特定的 goroutine 處理比較緩慢,它也能快速而優(yōu)雅地退出。
剛開始的時候,圍繞如何輸出日志,我使用 sync.WaitGroup 實(shí)現(xiàn)流程控制,但我很快意識到如果我創(chuàng)建了很多 goroutine,即使其中很小一部分沒有立即返回,我的程序會在終止時 hang 住。這讓我重新考慮 context.WithCancel,并理解該如何重新調(diào)整我的程序,使其能快速且優(yōu)雅地退出!
我們可以通過構(gòu)建示例程序一步步來驗證下,最初的示例程序并不會使用前面提到的技術(shù)點(diǎn)。
package main
import (
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
)
func doSomething(ch chan int) {
fmt.Printf("Received job %d\n", <-ch)
}
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
var (
closing = make(chan struct{})
ticker = time.NewTicker(1 * time.Second)
logger = log.New(os.Stderr, "", log.LstdFlags)
batchSize = 6
jobs = make(chan int, batchSize)
)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
<-signals
close(closing)
}()
loop:
for {
select {
case <-closing:
break loop
case <-ticker.C:
for n := 0; n < batchSize; n++ {
jobs <- n
go doSomething(jobs)
}
logger.Printf("Completed doing %d things.", batchSize)
}
}
}
執(zhí)行程序,我們會發(fā)現(xiàn) Received job ... 和 Completed doing ... 會交替輸出,輸出可能類似下面這樣:
Received job 0
Received job 1
Received job 2
2021/02/08 21:30:59 Completed doing 6 things.
Received job 3
Received job 4
Received job 5
2021/02/08 21:31:00 Completed doing 6 things.
多次打印的結(jié)果并不一致!這是合理的,我們都知道 goroutines 并不會阻塞,所以除非我們對它做些什么,否則協(xié)程里的代碼會立即執(zhí)行。
我們添加 WaitGroup 來完善下流程,先在 var 代碼塊中定義變量:
var (
..
wg sync.WaitGroup
)
調(diào)整下 loop 循環(huán):
for n := 0; n < batchSize; n++ {
wg.Add(1)
jobs <- n
go doSomething(&wg, jobs)
}
wg.Wait()
logger.Printf("Completed doing %d things.", batchSize)
最后,修改協(xié)程函數(shù):
func doSomething(wg *sync.WaitGroup, ch chan int) {
defer wg.Done()
fmt.Printf("Received job %d\n", <-ch)
}
WaitGroups 會等待一組 goroutines 執(zhí)行完成,仔細(xì)閱讀代碼我們發(fā)現(xiàn):
每次循環(huán)時 WaitGroup 的計數(shù)器會加 1,加 1 原因是因為在 goroutine 里每次調(diào)用 wg.Done() 計數(shù)器會減一,這樣 goroutine 執(zhí)行完成返回之后計數(shù)器能維持平衡; 在調(diào)用 logger 之前,我們添加了 wg.Wait(),這樣當(dāng)程序執(zhí)行到這里的時候便會阻塞直到 WaitGroups 的計數(shù)器減為 0。當(dāng)所有 goroutines 調(diào)用 wg.Done() 之后,計數(shù)器便會恢復(fù)成 0。
很簡單,是不是?我們再次執(zhí)行程序,可以看到結(jié)果比之前的更一致:
2021/02/08 21:46:47 Completed doing 6 things.
Received job 0
Received job 1
Received job 2
Received job 4
Received job 5
Received job 3
2021/02/08 21:46:48 Completed doing 6 things.
Received job 0
Received job 2
Received job 3
Received job 4
Received job 5
Received job 1
順便說一句,與預(yù)期的一樣,jobs 并不會按順序執(zhí)行,因為我們并沒有采取任何措施來確保這一點(diǎn)。
在我們繼續(xù)之前,按照目前的狀態(tài)執(zhí)行程序并嘗試使用 Control+D 來終止程序,程序退出不會出現(xiàn)任何問題。
為了證明程序需要進(jìn)一步完善,讓我們添加一些代碼模擬真實(shí)業(yè)務(wù)場景。我們新建一個函數(shù),函數(shù)里面調(diào)用外部 API 并等待請求響應(yīng)。請求過程中,我們將會調(diào)用 context.WithCancel 取消請求。
首先,創(chuàng)建一個未使用 context 的函數(shù)。下面的代碼更復(fù)雜,有必要的話請看注釋:
func doAPICall(wg *sync.WaitGroup) error {
defer wg.Done()
req, err := http.NewRequest("GET", "https://httpstat.us/200", nil)
if err != nil {
return err
}
// The httpstat.us API accepts a sleep parameter which sleeps the request for the
// passed time in ms
q := req.URL.Query()
sleepMin := 1000
sleepMax := 4000
q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin))
req.URL.RawQuery = q.Encode()
// Make the request to the API in an anonymous function, using a channel to
// communicate the results
c := make(chan error, 1)
go func() {
// For the purposes of this example, we're not doing anything with the response.
_, err := http.DefaultClient.Do(req)
c <- err
}()
// Block until the channel is populated
return <-c
}
修改定時器“滴答”,刪除調(diào)用 doSomething() 的代碼、刪除 jobs channel(不會再使用到它)并且調(diào)用 doAPICall()。
for n := 0; n < batchSize; n++ {
wg.Add(1)
go doAPICall(&wg)
}
執(zhí)行程序并再次嘗試退出程序:
WaitGroup 會等待所有的 goroutines 完成; doAPICall() 調(diào)用會發(fā)生阻塞直到 httpstat.us() 接口返回,調(diào)用耗時大概 1000ms ~ 4000ms; 取決于你終止程序的時間,退出會變得很困難(耗時比較長),試一次可能發(fā)現(xiàn)不了問題,在不同的時刻多嘗試幾次;
現(xiàn)在來演示 context.WithCancel 如何進(jìn)一步控制程序取消。當(dāng) context.WithCancel 初始化之后,會返回一個 context 和取消函數(shù) CancelFunc()。這個取消函數(shù)會取消 context,第一次聽到這個會困惑。閱讀 Go 官方博客的文章 Go Concurrency Patterns: Context[1] 對于進(jìn)一步理解 context.WithCancel 會有所幫助,推薦閱讀完本篇文章之后再看!
ok,我們回到正文。為了實(shí)現(xiàn)取消流程控制,需要修改下代碼。首先,使用 context 創(chuàng)建一個取消函數(shù):
var (
ctx, cancel = context.WithCancel(context.Background())
...
)
接著,在匿名函數(shù)里監(jiān)聽程序終止的信號,signals 被通知之后調(diào)用 CancelFunc,這意味著上下文將被視為已取消:
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
<-signals
logger.Println("Initiating shutdown of producer.")
cancel()
close(closing)
}()
接著,調(diào)整 doAPICall() 函數(shù),多接收一個 context 參數(shù);使用 select-case 修改函數(shù)返回,等待 ctx.Done 或等待請求響應(yīng)。為了簡介,只展示了函數(shù)部分代碼:
func doAPICall(ctx context.Context, ....) {
// Cancel the request if ctx.Done is closed or await the response
select {
case <-ctx.Done():
return ctx.Err()
case err := <-c:
return err
}
}
最后,確保調(diào)用 doAPICall() 函數(shù)時傳遞了 context 參數(shù)?,F(xiàn)在,運(yùn)行程序并多次在不同的時間點(diǎn)終止程序。
現(xiàn)在會發(fā)生什么?程序會立即退出。select-case 代碼會監(jiān)聽 ctx.Done 是否關(guān)閉或者接口請求是否響應(yīng),哪個 case 的 channel 信號先到就先執(zhí)行誰。當(dāng)應(yīng)用程序終止時,ctx.Done() 優(yōu)先執(zhí)行并且函數(shù)提前返回,不再關(guān)心請求是否響應(yīng)。WaitGroup 的作用沒變 - 等待一組 goroutines 完成?,F(xiàn)在,程序的終止流程得到很大改善。
Go 的基本哲學(xué)之一就是:
Don't communicate by sharing memory; share memory by communicating.
這里,我們使用 channel 在 goroutines 之間傳遞引用,這使得我們能夠改進(jìn)應(yīng)用程序的流程。
有很多種辦法可以用來改善流程,例如,我們不跨 goroutine 接收 API 的響應(yīng)或者錯誤。值得慶幸的是,Go 很容易就可以實(shí)現(xiàn)這點(diǎn),因此可以將它視為一個起點(diǎn),如果你還想完善,可以嘗試下這些想法。
下面是完整的示例,僅供參考:
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func doAPICall(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
req, err := http.NewRequest("GET", "https://httpstat.us/200", nil)
if err != nil {
return err
}
// The httpstat.us API accepts a sleep parameter which sleeps the request for the
// passed time in ms
q := req.URL.Query()
sleepMin := 1000
sleepMax := 4000
q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin))
req.URL.RawQuery = q.Encode()
c := make(chan error, 1)
go func() {
// For the purposes of this example, we're not doing anything with the response.
_, err := http.DefaultClient.Do(req)
c <- err
}()
// Block until either channel is populated or closed
select {
case <-ctx.Done():
return ctx.Err()
case err := <-c:
return err
}
}
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
var (
closing = make(chan struct{})
ticker = time.NewTicker(1 * time.Second)
logger = log.New(os.Stderr, "", log.LstdFlags)
batchSize = 6
wg sync.WaitGroup
ctx, cancel = context.WithCancel(context.Background())
)
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
<-signals
cancel()
close(closing)
}()
loop:
for {
select {
case <-closing:
break loop
case <-ticker.C:
for n := 0; n < batchSize; n++ {
wg.Add(1)
go doAPICall(ctx, &wg)
}
wg.Wait()
logger.Printf("Completed doing %d things.", batchSize)
}
}
}
最后一點(diǎn),本文部分代碼受到博文 Go Concurrency Patterns: Context[2] 的啟發(fā),再次推薦這篇文章。這篇文章還介紹了其他控制函數(shù),比如:context.WithTimeout 等。Go 官方博客是每個人都應(yīng)該閱讀的寶庫!
參考資料
Go Concurrency Patterns: Context: https://blog.golang.org/context
[2]Go Concurrency Patterns: Context: https://blog.golang.org/context
推薦閱讀
