Go 每日一庫之 tunny
簡介
之前寫過一篇文章介紹了ants這個 goroutine 池實現(xiàn)。當時在網(wǎng)上查看相關(guān)資料的時候,發(fā)現(xiàn)了另外一個實現(xiàn)tunny。趁著時間相近,正好研究一番。也好比較一下這兩個庫。那就讓我們開始吧。
快速開始
本文代碼使用 Go Modules。
創(chuàng)建目錄并初始化:
$ mkdir tunny && cd tunny
$ go mod init github.com/darjun/go-daily-lib/tunny
使用go get從 GitHub 獲取tunny庫:
$ go get -u github.com/Jeffail/tunny
為了方便地和ants做一個對比,我們將ants中的示例重新用tunny實現(xiàn)一遍:還是那個分段求和的例子:
const (
DataSize = 10000
DataPerTask = 100
)
func main() {
numCPUs := runtime.NumCPU()
p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
var sum int
for _, n := range payload.([]int) {
sum += n
}
return sum
})
defer p.Close()
// ...
}
使用也非常簡單,首先創(chuàng)建一個Pool,這里使用tunny.NewFunc()。
第一個參數(shù)為池子大小,即同時有多少個 worker (也即 goroutine)在工作,這里設(shè)置成邏輯 CPU 個數(shù),對于 CPU 密集型任務(wù),這個值設(shè)置太大無意義,反而有可能導致 goroutine 切換頻繁而降低性能。
第二個參數(shù)傳入一個func(interface{})interface{}的參數(shù)作為任務(wù)處理函數(shù)。后續(xù)傳入數(shù)據(jù)就會調(diào)用這個函數(shù)處理。
池子使用完需要關(guān)閉,這里使用defer p.Close()在程序退出前關(guān)閉。
然后,生成測試數(shù)據(jù),還是 10000 個隨機數(shù),分成 100 組:
nums := make([]int, DataSize)
for i := range nums {
nums[i] = rand.Intn(1000)
}
處理每組數(shù)據(jù):
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partialSums := make([]int, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
go func(i int) {
partialSums[i] = p.Process(nums[i*DataPerTask : (i+1)*DataPerTask]).(int)
wg.Done()
}(i)
}
wg.Wait()
調(diào)用p.Process()方法,傳入任務(wù)數(shù)據(jù),池子中會選擇空閑的 goroutine 來處理這個數(shù)據(jù)。由于我們上面設(shè)置了處理函數(shù),goroutine 會直接調(diào)用該函數(shù),將這個切片作為參數(shù)傳入。
tunny與ants不同的是,tunny的任務(wù)處理是同步的,即調(diào)用p.Process()方法之后,當前 goroutine 會掛起,直到任務(wù)處理完成之后才會被喚醒。由于是同步的,所以p.Process()方法可以直接返回處理結(jié)果。這也是上面程序在分發(fā)任務(wù)的時候,啟動多個 goroutine 的原因。如果不是每個任務(wù)都啟動一個 goroutine,p.Process()方法會一直等待任務(wù)完成,那么后面的任務(wù)要等到前面的任務(wù)全部執(zhí)行完之后才能執(zhí)行。這樣就發(fā)揮不了并發(fā)的優(yōu)勢了。
這里注意一個小細節(jié),我將for循環(huán)變量作為參數(shù)傳給 goroutine 函數(shù)了。如果不這樣做,所有 goroutine 都共用外層的i,而且 goroutine 開始運行時,for循環(huán)大概率已經(jīng)結(jié)束了,這時i = DataSize/DataPerTask,索引nums[i*DataPerTask : (i+1)*DataPerTask]會越界觸發(fā) panic。
最后統(tǒng)計數(shù)據(jù),驗證結(jié)果:
var sum int
for _, s := range partialSums {
sum += s
}
var expect int
for _, num := range nums {
expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
運行:
$ go run main.go
finish all tasks, result is 5010172 expect:5010172
超時
默認情況下,p.Process()會一直阻塞直到任務(wù)完成,即使當前沒有空閑 worker 也會阻塞。我們也可以使用帶超時的Process()方法:ProcessTimed()。傳入一個超時時間間隔,如果超過這個時間還沒有空閑 worker,或者任務(wù)還沒有處理完成,就會終止,并返回一個錯誤。
超時有 2 種情況:
等不到空閑的 worker:所有 worker 一直處理繁忙狀態(tài),正在處理的任務(wù)比較耗時,無法短時間內(nèi)完成; 任務(wù)本身比較耗時。
下面我們編寫一個計算斐波那契的函數(shù),使用遞歸這種低效的實現(xiàn)方法:
func fib(n int) int {
if n <= 1 {
return 1
}
return fib(n-1) + fib(n-2)
}
我們先看任務(wù)比較耗時的情況,創(chuàng)建Pool對象。為了觀察更明顯,在處理函數(shù)中添加了time.Sleep()語句:
p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
n := payload.(int)
result := fib(n)
time.Sleep(5 * time.Second)
return result
})
defer p.Close()
生成與池容量相等的任務(wù)數(shù),調(diào)用p.ProcessTimed()方法,設(shè)置超時為 1s:
var wg sync.WaitGroup
wg.Add(numCPUs)
for i := 0; i < numCPUs; i++ {
go func(i int) {
n := rand.Intn(30)
result, err := p.ProcessTimed(n, time.Second)
nowStr := time.Now().Format("2006-01-02 15:04:05")
if err != nil {
fmt.Printf("[%s]task(%d) failed:%v\n", nowStr, i, err)
} else {
fmt.Printf("[%s]fib(%d) = %d\n", nowStr, n, result)
}
wg.Done()
}(i)
}
wg.Wait()
因為處理函數(shù)中 sleep 5s,所以任務(wù)在執(zhí)行過程中就超時了。運行:
$ go run main.go
[2021-06-10 16:36:26]task(7) failed:job request timed out
[2021-06-10 16:36:26]task(4) failed:job request timed out
[2021-06-10 16:36:26]task(1) failed:job request timed out
[2021-06-10 16:36:26]task(6) failed:job request timed out
[2021-06-10 16:36:26]task(5) failed:job request timed out
[2021-06-10 16:36:26]task(0) failed:job request timed out
[2021-06-10 16:36:26]task(3) failed:job request timed out
[2021-06-10 16:36:26]task(2) failed:job request timed out
都在同一秒中超時。
我們將任務(wù)數(shù)量翻倍,再將處理函數(shù)中的 sleep 改為 990ms,保證前一批任務(wù)能順利完成,后續(xù)任務(wù)或者由于等不到空閑 worker,或者由于執(zhí)行時間過長而超時返回。運行:
$ go run main.go
[2021-06-10 16:42:46]fib(11) = 144
[2021-06-10 16:42:46]fib(25) = 121393
[2021-06-10 16:42:46]fib(27) = 317811
[2021-06-10 16:42:46]fib(1) = 1
[2021-06-10 16:42:46]fib(18) = 4181
[2021-06-10 16:42:46]fib(29) = 832040
[2021-06-10 16:42:46]fib(17) = 2584
[2021-06-10 16:42:46]fib(20) = 10946
[2021-06-10 16:42:46]task(5) failed:job request timed out
[2021-06-10 16:42:46]task(14) failed:job request timed out
[2021-06-10 16:42:46]task(8) failed:job request timed out
[2021-06-10 16:42:46]task(7) failed:job request timed out
[2021-06-10 16:42:46]task(13) failed:job request timed out
[2021-06-10 16:42:46]task(12) failed:job request timed out
[2021-06-10 16:42:46]task(11) failed:job request timed out
[2021-06-10 16:42:46]task(6) failed:job request timed out
context
context 是協(xié)調(diào) goroutine 的工具。tunny支持帶context.Context參數(shù)的方法:ProcessCtx()。當前 context 狀態(tài)變?yōu)?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;color: rgb(239, 112, 96);">Done之后,任務(wù)也會停止執(zhí)行。context 會由于超時、取消等原因切換為Done狀態(tài)。還是拿上面的例子:
go func(i int) {
n := rand.Intn(30)
ctx, cancel := context.WithCancel(context.Background())
if i%2 == 0 {
go func() {
time.Sleep(500 * time.Millisecond)
cancel()
}()
}
result, err := p.ProcessCtx(ctx, n)
if err != nil {
fmt.Printf("task(%d) failed:%v\n", i, err)
} else {
fmt.Printf("fib(%d) = %d\n", n, result)
}
wg.Done()
}(i)
其他代碼都一樣,我們調(diào)用p.ProcessCtx()方法來執(zhí)行任務(wù)。參數(shù)是一個可取消的Context。對于序號為偶數(shù)的任務(wù),我們啟動一個 goroutine 在 500ms 之后cancel()掉這個Context。代碼運行結(jié)果如下:
$ go run main.go
task(4) failed:context canceled
task(6) failed:context canceled
task(0) failed:context canceled
task(2) failed:context canceled
fib(27) = 317811
fib(25) = 121393
fib(1) = 1
fib(18) = 4181
我們看到偶數(shù)序號的任務(wù)都被取消了。
源碼
tunny的源碼更少,除去測試代碼和注釋,連 500 行都不到。那么就一起來看一下吧。Pool結(jié)構(gòu)如下:
// src/github.com/Jeffail/tunny.go
type Pool struct {
queuedJobs int64
ctor func() Worker
workers []*workerWrapper
reqChan chan workRequest
workerMut sync.Mutex
}
Pool結(jié)構(gòu)中有一個ctor字段,這是一個函數(shù)對象,用于返回一個實現(xiàn)Worker接口的值:
type Worker interface {
Process(interface{}) interface{}
BlockUntilReady()
Interrupt()
Terminate()
}
這個接口不同的方法在任務(wù)執(zhí)行的不同階段調(diào)用。最重要的當屬Process(interface{}) interface{}方法了。這個就是執(zhí)行任務(wù)的函數(shù)。tunny提供New()方法創(chuàng)建Pool對象,這個方法需要我們自己構(gòu)造ctor函數(shù)對象,使用多有不便。tunny提供了另外兩個默認實現(xiàn)closureWorker和callbackWorker:
type closureWorker struct {
processor func(interface{}) interface{}
}
func (w *closureWorker) Process(payload interface{}) interface{} {
return w.processor(payload)
}
func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt() {}
func (w *closureWorker) Terminate() {}
type callbackWorker struct{}
func (w *callbackWorker) Process(payload interface{}) interface{} {
f, ok := payload.(func())
if !ok {
return ErrJobNotFunc
}
f()
return nil
}
func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt() {}
func (w *callbackWorker) Terminate() {}
tunny.NewFunc()方法使用的就是closureWorker:
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
return New(n, func() Worker {
return &closureWorker{
processor: f,
}
})
}
創(chuàng)建的closureWorker直接將參數(shù)f作為任務(wù)處理函數(shù)。
tunny.NewCallback()方法使用callbackWorker:
func NewCallback(n int) *Pool {
return New(n, func() Worker {
return &callbackWorker{}
})
}
callbackWorker結(jié)構(gòu)中沒有處理函數(shù),只能給它發(fā)送無參無返回值的函數(shù)對象作為任務(wù),它的Process()方法就是執(zhí)行這個函數(shù)。
創(chuàng)建Pool對象后,都是調(diào)用它的SetSize()方法,設(shè)置 worker 數(shù)量。在這個方法中會啟動相應(yīng)數(shù)量的 goroutine:
func (p *Pool) SetSize(n int) {
p.workerMut.Lock()
defer p.workerMut.Unlock()
lWorkers := len(p.workers)
if lWorkers == n {
return
}
for i := lWorkers; i < n; i++ {
p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
}
// 停止過多的 worker
for i := n; i < lWorkers; i++ {
p.workers[i].stop()
}
// 等待 worker 停止
for i := n; i < lWorkers; i++ {
p.workers[i].join()
// -----------------
}
p.workers = p.workers[:n]
}
SetSize()其實在擴容和縮容的時候也會調(diào)用。對于擴容,它會創(chuàng)建相應(yīng)數(shù)量的 worker。對于縮容,它會將多余的 worker 停掉。與ants不同,tunny的擴容縮容都是即時生效的。
代碼中,我用-----------------標出來的地方我覺得有點問題。對于縮容,因為底層的數(shù)組沒有變化,workers切片長度縮小之后,數(shù)組中后面的元素實際上就訪問不到了,但是數(shù)組還持有它的引用,算是一種內(nèi)存泄漏吧。所以穩(wěn)妥起見最好加上p.workers[i] = nil?
這里創(chuàng)建的 worker 實際上是包裝了一層的workerWrapper結(jié)構(gòu):
// src/github.com/Jeffail/worker.go
type workerWrapper struct {
worker Worker
interruptChan chan struct{}
reqChan chan<- workRequest
closeChan chan struct{}
closedChan chan struct{}
}
func newWorkerWrapper(
reqChan chan<- workRequest,
worker Worker,
) *workerWrapper {
w := workerWrapper{
worker: worker,
interruptChan: make(chan struct{}),
reqChan: reqChan,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}
go w.run()
return &w
}
workerWrapper結(jié)構(gòu)創(chuàng)建之后會立刻調(diào)用run()方法啟動一個 goroutine:
func (w *workerWrapper) run() {
jobChan, retChan := make(chan interface{}), make(chan interface{})
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
for {
w.worker.BlockUntilReady()
select {
case w.reqChan <- workRequest{
jobChan: jobChan,
retChan: retChan,
interruptFunc: w.interrupt,
}:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case <-w.closeChan:
return
}
}
}
每個 worker goroutine 都在嘗試向w.reqChan通道中發(fā)送一個workRequest結(jié)構(gòu)數(shù)據(jù),發(fā)送成功之后,從jobChan中獲取任務(wù)數(shù)據(jù),然后調(diào)用Worker.Process()方法執(zhí)行任務(wù),最后將結(jié)果發(fā)送到retChan通道中。這里其實有好幾個交互。需要結(jié)合Process()方法來看才更清晰:
func (p *Pool) Process(payload interface{}) interface{} {
request, open := <-p.reqChan
request.jobChan <- payload
payload, open = <-request.retChan
return payload
}
刪掉無相關(guān)的代碼,最后就是上面這樣。我們在調(diào)用池對象的Process()方法時,嘗試從通道reqChan中接收數(shù)據(jù),然后將任務(wù)數(shù)據(jù)發(fā)送到jobChan通道中,最后從retChan通道中接收結(jié)果。與上面的run流程結(jié)合來看,實際上在正常執(zhí)行一個任務(wù)時,Pool與workerWrapper有 3 次交互。
觀察Pool創(chuàng)建到workerWrapper創(chuàng)建的流程,我們可以看到實際上Pool結(jié)構(gòu)中的reqChan與workerWrapper結(jié)構(gòu)中的reqChan是同一個通道。即workerWrapper啟動后,會阻塞在向reqChan通道發(fā)送數(shù)據(jù)上,直到調(diào)用了Pool的Process*()方法,從通道reqChan取出數(shù)據(jù)。Process()方法得到workRequest會向它的jobChan通道中發(fā)送任務(wù)數(shù)據(jù)。而workerWrapper.run()方法成功發(fā)送數(shù)據(jù)到reqChan之后就開始等待從jobChan通道中接收數(shù)據(jù),這時接收到Process()方法發(fā)送過來的數(shù)據(jù)。開始執(zhí)行w.worker.Process()方法,然后向retChan通道發(fā)送結(jié)果數(shù)據(jù),Process()方法在成功發(fā)送數(shù)據(jù)到jobChan之后,就開始等待從retChan通道中接收數(shù)據(jù)。接收成功之后,Process()方法返回,workerWrapper.run()繼續(xù)阻塞在w.reqChan <-這條語句上,等待處理下一個任務(wù)。注意jobChan和retChan都是workerWrapper.run()方法中創(chuàng)建的通道。
那么超時是怎么實現(xiàn)的呢?看方法ProcessTimed()的實現(xiàn):
func (p *Pool) ProcessTimed(
payload interface{},
timeout time.Duration,
) (interface{}, error) {
tout := time.NewTimer(timeout)
var request workRequest
select {
case request, open = <-p.reqChan:
case <-tout.C:
return nil, ErrJobTimedOut
}
select {
case request.jobChan <- payload:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}
select {
case payload, open = <-request.retChan:
case <-tout.C:
request.interruptFunc()
return nil, ErrJobTimedOut
}
tout.Stop()
return payload, nil
}
同樣地,刪除不相干的代碼。首先,創(chuàng)建一個timer,超時時間由傳入?yún)?shù)指定。后面有 3 個select語句:
等待從 p.reqChan取數(shù)據(jù),即等待有 worker 空閑;等待發(fā)送數(shù)據(jù)到 jobChan,即等待 worker 從jobChan取出任務(wù)數(shù)據(jù);等待從 retChan取數(shù)據(jù),即等待 worker 將結(jié)果發(fā)送到retChan。
第一種情況,如果超時了,說明 worker 都處于繁忙狀態(tài),直接返回任務(wù)超時。后面兩種情況實際上是任務(wù)已經(jīng)開始執(zhí)行了,但是在規(guī)定的時間內(nèi)沒有完成。這兩種情況,需要終止任務(wù)的執(zhí)行。我們看到上面調(diào)用了workerRequest.interruptFunc()方法,也就是workerWrapper.interrupt()方法:
func (w *workerWrapper) interrupt() {
close(w.interruptChan)
w.worker.Interrupt()
}
這個方法就是簡單關(guān)閉了interrupteChan通道,然后調(diào)用worker對象的Interrupt()方法,默認實現(xiàn)中這個方法都是空的。
interruptChan通道關(guān)閉后,goroutine 中等待從jobChan接收數(shù)據(jù)和等待向retChan發(fā)送數(shù)據(jù)的操作都會取消:
select {
case payload := <-jobChan:
result := w.worker.Process(payload)
select {
case retChan <- result:
case <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
case _, _ = <-w.interruptChan:
w.interruptChan = make(chan struct{})
}
ProcessCtx()實現(xiàn)也是類似的。
最后調(diào)用workerWrapper.stop()會關(guān)閉closeChan通道,這會導致workerWrapper.run()方法中的for循環(huán)跳出,進而執(zhí)行defer函數(shù)中的close(retChan)和close(closedChan):
defer func() {
w.worker.Terminate()
close(retChan)
close(w.closedChan)
}()
這里需要關(guān)閉retChan通道是為了防止Process*()方法在等待retChan數(shù)據(jù)。
closedChan通道關(guān)閉后,workerWrapper.join()方法就返回了。
func (w *workerWrapper) join() {
<-w.closedChan
}
Worker幾個方法的調(diào)用時機:
Process():執(zhí)行任務(wù)時;Interrupt():任務(wù)因為超時會被 context 取消時;BlockUntilReady():每次執(zhí)行新任務(wù)前,可能需要準備一些資源;Terminate():workerWrapper.run()中的 defer 函數(shù)中,即停止 worker 后。
這些時機在代碼中都能清晰地看到。
基于源碼,我畫了一個流程圖:

圖中省略了中斷的流程。
tunny vs ants
tunny設(shè)計的思路與ants有較大的區(qū)別:
tunny只支持同步的方式執(zhí)行任務(wù),雖然任務(wù)在另一個 goroutine 執(zhí)行,但是提交任務(wù)的 goroutine 必須等待結(jié)果返回或超時。不能做其他事情。正是由于這一點,導致tunny的設(shè)計稍微一點復雜,而且為了支持超時和取消,設(shè)計了多個通道用于和執(zhí)行任務(wù)的 goroutine 通信。一次任務(wù)執(zhí)行的過程涉及多次通信,性能是有損失的。從另一方面說,同步的編程方式更符合人類的直覺。
ants完全是異步的任務(wù)執(zhí)行流程,相比tunny性能是稍高一些的。但是也因為它的異步特性,導致沒有任務(wù)超時、取消這些機制。而且如果需要收集結(jié)果,必須要自己編寫額外的代碼。
總結(jié)
本文介紹了另一個 goroutine 池的實現(xiàn)tunny。它以同步的方式來處理任務(wù),編寫代碼更加直觀,對任務(wù)的執(zhí)行流程有更強的控制,如超時、取消等。當然實現(xiàn)也復雜一些。tunny代碼不走 500 行,非常建議讀一讀。
大家如果發(fā)現(xiàn)好玩、好用的 Go 語言庫,歡迎到 Go 每日一庫 GitHub 上提交 issue??
參考
tunny GitHub:https://github.com/Jeffail/tunny ants GitHub:github.com/panjf2000/ants Go 每日一庫 GitHub:https://github.com/darjun/go-daily-lib
推薦閱讀
