Go語言的并發(fā)與WorkerPool - 第二部分
點(diǎn)擊上方“Go語言進(jìn)階學(xué)習(xí)”,進(jìn)行關(guān)注
回復(fù)“Go語言”即可獲贈從入門到進(jìn)階共10本電子書
via:
https://hackernoon.com/concurrency-in-golang-and-workerpool-part-2-l3w31q7
作者:Hasan
四哥水平有限,如果有翻譯或理解錯(cuò)誤的點(diǎn),煩請幫忙指出,感謝!
這是系列文章的第二篇,第一篇文章點(diǎn)擊這里查看。
原文如下:
基于 goroutine 和 channel 的并發(fā)特性,使得 Go 成為了強(qiáng)大的并發(fā)語言。上一篇文章,我們討論了如何構(gòu)建 workerPool 來提高程序的并發(fā)性能,換句話說,避免耗盡系統(tǒng)資源。但那只是一個(gè)簡單的示例,演示我們應(yīng)該如何實(shí)現(xiàn)。
基于對上一篇文章的學(xué)習(xí),在這篇文章里面,我們將構(gòu)建一個(gè)健壯的解決方案,以便在任何其他應(yīng)用程序里面可以使用該方案。網(wǎng)絡(luò)上有其他復(fù)雜架構(gòu)的解決方案,比如使用調(diào)度器等等。實(shí)際上,我們并不需要這些復(fù)雜的設(shè)計(jì),僅僅使用一個(gè)共享 channel 就可以解決問題。我們一起來看下,該如何構(gòu)建呢?
代碼結(jié)構(gòu)
我們創(chuàng)建了一個(gè)通用的 workerPool 包,根據(jù)業(yè)務(wù)所需的并發(fā)性使用 worker 來處理任務(wù)。一起來看下目錄結(jié)構(gòu):
workerpool
├── pool.go
├── task.go
└── worker.go
workerpool 目錄在項(xiàng)目的根目錄下。Task 是需要處理單個(gè)工作單元;Worker 是一個(gè)簡單的 worker 函數(shù),用于執(zhí)行任務(wù);而 Pool 用于創(chuàng)建、管理 workers。
實(shí)現(xiàn)
先看下 Task 代碼:
// workerpool/task.go
package workerpool
import (
"fmt"
)
type Task struct {
Err error
Data interface{}
f func(interface{}) error
}
func NewTask(f func(interface{}) error, data interface{}) *Task {
return &Task{f: f, Data: data}
}
func process(workerID int, task *Task) {
fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
task.Err = task.f(task.Data)
}
Task 是一個(gè)簡單的結(jié)構(gòu)體,保存處理任務(wù)所需要的一切數(shù)據(jù)。創(chuàng)建 task 時(shí),傳遞了 Data 和待執(zhí)行函數(shù) f,process() 函數(shù)會處理任務(wù)。處理任務(wù)時(shí),將 Data 作為參數(shù)傳遞給函數(shù) f,并將執(zhí)行結(jié)果保存在 Task.Err 里。
我們來看下 Worker 是如何處理任務(wù)的:
// workerpool/worker.go
package workerpool
import (
"fmt"
"sync"
)
// Worker handles all the work
type Worker struct {
ID int
taskChan chan *Task
}
// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
return &Worker{
ID: ID,
taskChan: channel,
}
}
// Start starts the worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
fmt.Printf("Starting worker %d\n", wr.ID)
wg.Add(1)
go func() {
defer wg.Done()
for task := range wr.taskChan {
process(wr.ID, task)
}
}()
}
我們創(chuàng)建了一個(gè)小巧的 Worker 結(jié)構(gòu)體,包含 worker ID 和 一個(gè)保存待處理任務(wù)的 channel。在 Start() 方法里,使用 for range 從 taskChan 讀取任務(wù)并處理??梢韵胂蟮牡?,多個(gè) worker 可以并發(fā)地執(zhí)行任務(wù)。
workerPool
我們通過實(shí)現(xiàn) Task 和 Worker 來處理任務(wù),但是好像還缺點(diǎn)什么東西,誰負(fù)責(zé)生成這些 worker 并將任務(wù)發(fā)送給它們?答案是:Worker Pool。
// workerpoo/pool.go
package workerpool
import (
"fmt"
"sync"
"time"
)
// Pool is the worker pool
type Pool struct {
Tasks []*Task
concurrency int
collector chan *Task
wg sync.WaitGroup
}
// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
return &Pool{
Tasks: tasks,
concurrency: concurrency,
collector: make(chan *Task, 1000),
}
}
// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
worker.Start(&p.wg)
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
close(p.collector)
p.wg.Wait()
}
上面的代碼,pool 保存了所有待處理的任務(wù),并且生成與 concurrency 數(shù)量一致的 goroutine,用于并發(fā)地處理任務(wù)。workers 之間共享緩存 channel -- collector。
所以,當(dāng)我們把這個(gè)工作池跑起來時(shí),可以生成滿足所需數(shù)量的 worker,workers 之間共享 collector channel。接著,使用 for range 讀取 tasks,并將讀取到的 task 寫入 collector 里。我們使用 sync.WaitGroup 實(shí)現(xiàn)協(xié)程之間的同步?,F(xiàn)在我們有了一個(gè)很好的解決方案,一起來測試下。
// main.go
package main
import (
"fmt"
"time"
"github.com/Joker666/goworkerpool/workerpool"
)
func main() {
var allTask []*workerpool.Task
for i := 1; i <= 100; i++ {
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, i)
allTask = append(allTask, task)
}
pool := workerpool.NewPool(allTask, 5)
pool.Run()
}
上面的代碼,創(chuàng)建了 100 個(gè)任務(wù)并且使用 5 個(gè)并發(fā)處理這些任務(wù)。
輸出如下:
Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s
處理 100 個(gè)任務(wù)花費(fèi)了 2s,如何我們將并發(fā)數(shù)提高到 10,我們會看到處理完所有任務(wù)只需要大約 1s。
我們通過實(shí)現(xiàn) workerPool 構(gòu)建了一個(gè)健壯的解決方案,具有并發(fā)性、錯(cuò)誤處理、數(shù)據(jù)處理等功能。這是個(gè)通用的包,不耦合具體的實(shí)現(xiàn)。我們可以使用它來解決一些大問題。
進(jìn)一步擴(kuò)展:后臺處理任務(wù)
實(shí)際上,我們還可以進(jìn)一步擴(kuò)展上面的解決方案,以便 worker 可以在后臺等待我們投遞新的任務(wù)并處理。為此,代碼需要做一些修改,Task 結(jié)構(gòu)體保持不變,但是需要小改下 Worker,看下面代碼:
// workerpool/worker.go
// Worker handles all the work
type Worker struct {
ID int
taskChan chan *Task
quit chan bool
}
// NewWorker returns new instance of worker
func NewWorker(channel chan *Task, ID int) *Worker {
return &Worker{
ID: ID,
taskChan: channel,
quit: make(chan bool),
}
}
....
// StartBackground starts the worker in background waiting
func (wr *Worker) StartBackground() {
fmt.Printf("Starting worker %d\n", wr.ID)
for {
select {
case task := <-wr.taskChan:
process(wr.ID, task)
case <-wr.quit:
return
}
}
}
// Stop quits the worker
func (wr *Worker) Stop() {
fmt.Printf("Closing worker %d\n", wr.ID)
go func() {
wr.quit <- true
}()
}
Worker 結(jié)構(gòu)體新加 quit channel,并且新加了兩個(gè)方法。StartBackgorund() 在 for 循環(huán)里使用 select-case 從 taskChan 隊(duì)列讀取任務(wù)并處理,如果從 quit 讀取到結(jié)束信號就立即返回。Stop() 方法負(fù)責(zé)往 quit 寫入結(jié)束信號。
添加完這兩個(gè)新的方法之后,我們來修改下 Pool:
// workerpool/pool.go
type Pool struct {
Tasks []*Task
Workers []*Worker
concurrency int
collector chan *Task
runBackground chan bool
wg sync.WaitGroup
}
// AddTask adds a task to the pool
func (p *Pool) AddTask(task *Task) {
p.collector <- task
}
// RunBackground runs the pool in background
func (p *Pool) RunBackground() {
go func() {
for {
fmt.Print("? Waiting for tasks to come in ...\n")
time.Sleep(10 * time.Second)
}
}()
for i := 1; i <= p.concurrency; i++ {
worker := NewWorker(p.collector, i)
p.Workers = append(p.Workers, worker)
go worker.StartBackground()
}
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
p.runBackground = make(chan bool)
<-p.runBackground
}
// Stop stops background workers
func (p *Pool) Stop() {
for i := range p.Workers {
p.Workers[i].Stop()
}
p.runBackground <- true
}
Pool 結(jié)構(gòu)體添加了兩個(gè)成員:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于維持 pool 存活狀態(tài)。
添加了三個(gè)新的方法,AddTask() 方法用于往 collector 添加任務(wù);RunBackground() 方法衍生出一個(gè)無限運(yùn)行的 goroutine,以便 pool 維持存活狀態(tài),因?yàn)?runBackground 信道是空,讀取空的 channel 會阻塞,所以 pool 能維持運(yùn)行狀態(tài)。接著,在協(xié)程里面啟動 worker;Stop() 方法用于停止 worker,并且給 runBackground 發(fā)送停止信號以便結(jié)束 RunBackground() 方法。
我們來看下具體是如何工作的。
如果是在現(xiàn)實(shí)的業(yè)務(wù)場景中,pool 將會與 HTTP 服務(wù)器一塊運(yùn)行并消耗任務(wù)。我們通過 for 無限循環(huán)模擬這種這種場景,如果滿足某一條件,pool 將會停止。
// main.go
...
pool := workerpool.NewPool(allTask, 5)
go func() {
for {
taskID := rand.Intn(100) + 20
if taskID%7 == 0 {
pool.Stop()
}
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
fmt.Printf("Task %d processed\n", taskID)
return nil
}, taskID)
pool.AddTask(task)
}
}()
pool.RunBackground()
當(dāng)執(zhí)行上面的代碼時(shí),我們就會看到有隨機(jī)的 task 被投遞到后臺運(yùn)行的 workers,其中某一個(gè) worker 會讀取到任務(wù)并完成處理。當(dāng)滿足某一條件時(shí),程序便會停止退出。
總結(jié)
基于上一篇文章的初步解決方案,這篇文章討論了通過 workPool 構(gòu)建一個(gè)強(qiáng)大的解決方案。同時(shí),我們進(jìn)一步擴(kuò)展了該方案,實(shí)現(xiàn)后臺運(yùn)行 pool 并處理投遞的任務(wù)。
點(diǎn)擊【閱讀原文】直達(dá)代碼倉庫[1]。
參考資料
代碼倉庫: https://github.com/Joker666/goworkerpool
------------------- End -------------------
往期精彩文章推薦:

歡迎大家點(diǎn)贊,留言,轉(zhuǎn)發(fā),轉(zhuǎn)載,感謝大家的相伴與支持
想加入Go學(xué)習(xí)群請?jiān)诤笈_回復(fù)【入群】
萬水千山總是情,點(diǎn)個(gè)【在看】行不行
