1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Go語言的并發(fā)與WorkerPool - 第二部分

        共 10167字,需瀏覽 21分鐘

         ·

        2021-09-02 20:33

        via:
        https://hackernoon.com/concurrency-in-golang-and-workerpool-part-2-l3w31q7
        作者:Hasan

        這是系列文章的第二篇,第一篇文章點擊這里查看

        譯文如下:


        基于 goroutine 和 channel 的并發(fā)特性,使得 Go 成為了強大的并發(fā)語言。上一篇文章,我們討論了如何構(gòu)建 workerPool 來提高程序的并發(fā)性能,換句話說,避免耗盡系統(tǒng)資源。但那只是一個簡單的示例,演示我們應(yīng)該如何實現(xiàn)。

        基于對上一篇文章的學(xué)習(xí),在這篇文章里面,我們將構(gòu)建一個健壯的解決方案,以便在任何其他應(yīng)用程序里面可以使用該方案。網(wǎng)絡(luò)上有其他復(fù)雜架構(gòu)的解決方案,比如使用調(diào)度器等等。實際上,我們并不需要這些復(fù)雜的設(shè)計,僅僅使用一個共享 channel 就可以解決問題。我們一起來看下,該如何構(gòu)建呢?

        代碼結(jié)構(gòu)

        我們創(chuàng)建了一個通用的 workerPool 包,根據(jù)業(yè)務(wù)所需的并發(fā)性使用 worker 來處理任務(wù)。一起來看下目錄結(jié)構(gòu):

        workerpool
        ├── pool.go
        ├── task.go
        └── worker.go

        workerpool 目錄在項目的根目錄下。Task 是需要處理單個工作單元;Worker 是一個簡單的 worker 函數(shù),用于執(zhí)行任務(wù);而 Pool 用于創(chuàng)建、管理 workers。

        實現(xiàn)

        先看下 Task 代碼:

        // workerpool/task.go

        package workerpool

        import (
         "fmt"
        )

        type Task struct {
         Err  error
         Data interface{}
         f    func(interface{}) error
        }

        func NewTask(f func(interface{}) errordata interface{}) *Task {
         return &Task{f: f, Data: data}
        }

        func process(workerID int, task *Task) {
         fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
         task.Err = task.f(task.Data)
        }

        Task 是一個簡單的結(jié)構(gòu)體,保存處理任務(wù)所需要的一切數(shù)據(jù)。創(chuàng)建 task 時,傳遞了 Data 和待執(zhí)行函數(shù) f,process() 函數(shù)會處理任務(wù)。處理任務(wù)時,將 Data 作為參數(shù)傳遞給函數(shù) f,并將執(zhí)行結(jié)果保存在 Task.Err 里。

        我們來看下 Worker 是如何處理任務(wù)的:

        // workerpool/worker.go

        package workerpool

        import (
         "fmt"
         "sync"
        )

        // Worker handles all the work
        type Worker struct {
         ID       int
         taskChan chan *Task
        }

        // NewWorker returns new instance of worker
        func NewWorker(channel chan *Task, ID int) *Worker {
         return &Worker{
          ID:       ID,
          taskChan: channel,
         }
        }

        // Start starts the worker
        func (wr *Worker) Start(wg *sync.WaitGroup) {
         fmt.Printf("Starting worker %d\n", wr.ID)

         wg.Add(1)
         go func() {
          defer wg.Done()
          for task := range wr.taskChan {
           process(wr.ID, task)
          }
         }()
        }

        我們創(chuàng)建了一個小巧的 Worker 結(jié)構(gòu)體,包含 worker ID 和 一個保存待處理任務(wù)的 channel。在 Start() 方法里,使用 for range 從 taskChan 讀取任務(wù)并處理。可以想象的到,多個 worker 可以并發(fā)地執(zhí)行任務(wù)。

        workerPool

        我們通過實現(xiàn) Task 和 Worker 來處理任務(wù),但是好像還缺點什么東西,誰負責(zé)生成這些 worker 并將任務(wù)發(fā)送給它們?答案是:Worker Pool。

        // workerpoo/pool.go

        package workerpool

        import (
         "fmt"
         "sync"
         "time"
        )

        // Pool is the worker pool
        type Pool struct {
         Tasks   []*Task

         concurrency   int
         collector     chan *Task
         wg            sync.WaitGroup
        }

        // NewPool initializes a new pool with the given tasks and
        // at the given concurrency.
        func NewPool(tasks []*Task, concurrency int) *Pool {
         return &Pool{
          Tasks:       tasks,
          concurrency: concurrency,
          collector:   make(chan *Task, 1000),
         }
        }

        // Run runs all work within the pool and blocks until it's
        // finished.
        func (p *Pool) Run() {
         for i := 1; i <= p.concurrency; i++ {
          worker := NewWorker(p.collector, i)
          worker.Start(&p.wg)
         }

         for i := range p.Tasks {
          p.collector <- p.Tasks[i]
         }
         close(p.collector)

         p.wg.Wait()
        }

        上面的代碼,pool 保存了所有待處理的任務(wù),并且生成與 concurrency 數(shù)量一致的 goroutine,用于并發(fā)地處理任務(wù)。workers 之間共享緩存 channel -- collector。

        所以,當(dāng)我們把這個工作池跑起來時,可以生成滿足所需數(shù)量的 worker,workers 之間共享 collector channel。接著,使用 for range 讀取 tasks,并將讀取到的 task 寫入 collector 里。我們使用 sync.WaitGroup 實現(xiàn)協(xié)程之間的同步。現(xiàn)在我們有了一個很好的解決方案,一起來測試下。

        // main.go

        package main

        import (
         "fmt"
         "time"

         "github.com/Joker666/goworkerpool/workerpool"
        )

        func main() {
         var allTask []*workerpool.Task
         for i := 1; i <= 100; i++ {
          task := workerpool.NewTask(func(data interface{}) error {
           taskID := data.(int)
           time.Sleep(100 * time.Millisecond)
           fmt.Printf("Task %d processed\n", taskID)
           return nil
          }, i)
          allTask = append(allTask, task)
         }

         pool := workerpool.NewPool(allTask, 5)
         pool.Run()
        }

        上面的代碼,創(chuàng)建了 100 個任務(wù)并且使用 5 個并發(fā)處理這些任務(wù)。

        輸出如下:

        Worker 3 processes task 98
        Task 92 processed
        Worker 2 processes task 99
        Task 98 processed
        Worker 5 processes task 100
        Task 99 processed
        Task 100 processed
        Took ===============> 2.0056295s

        處理 100 個任務(wù)花費了 2s,如何我們將并發(fā)數(shù)提高到 10,我們會看到處理完所有任務(wù)只需要大約 1s。

        我們通過實現(xiàn) workerPool 構(gòu)建了一個健壯的解決方案,具有并發(fā)性、錯誤處理、數(shù)據(jù)處理等功能。這是個通用的包,不耦合具體的實現(xiàn)。我們可以使用它來解決一些大問題。

        進一步擴展:后臺處理任務(wù)

        實際上,我們還可以進一步擴展上面的解決方案,以便 worker 可以在后臺等待我們投遞新的任務(wù)并處理。為此,代碼需要做一些修改,Task 結(jié)構(gòu)體保持不變,但是需要小改下 Worker,看下面代碼:

        // workerpool/worker.go

        // Worker handles all the work
        type Worker struct {
         ID       int
         taskChan chan *Task
         quit     chan bool
        }

        // NewWorker returns new instance of worker
        func NewWorker(channel chan *Task, ID int) *Worker {
         return &Worker{
          ID:       ID,
          taskChan: channel,
          quit:     make(chan bool),
         }
        }

        ....

        // StartBackground starts the worker in background waiting
        func (wr *Worker) StartBackground() {
         fmt.Printf("Starting worker %d\n", wr.ID)

         for {
          select {
          case task := <-wr.taskChan:
           process(wr.ID, task)
          case <-wr.quit:
           return
          }
         }
        }

        // Stop quits the worker
        func (wr *Worker) Stop() {
         fmt.Printf("Closing worker %d\n", wr.ID)
         go func() {
          wr.quit <- true
         }()
        }

        Worker 結(jié)構(gòu)體新加 quit channel,并且新加了兩個方法。StartBackgorund() 在 for 循環(huán)里使用 select-case 從 taskChan 隊列讀取任務(wù)并處理,如果從 quit 讀取到結(jié)束信號就立即返回。Stop() 方法負責(zé)往 quit 寫入結(jié)束信號。

        添加完這兩個新的方法之后,我們來修改下 Pool:

        // workerpool/pool.go

        type Pool struct {
         Tasks   []*Task
         Workers []*Worker

         concurrency   int
         collector     chan *Task
         runBackground chan bool
         wg            sync.WaitGroup
        }

        // AddTask adds a task to the pool
        func (p *Pool) AddTask(task *Task) {
         p.collector <- task
        }

        // RunBackground runs the pool in background
        func (p *Pool) RunBackground() {
         go func() {
          for {
           fmt.Print("? Waiting for tasks to come in ...\n")
           time.Sleep(10 * time.Second)
          }
         }()

         for i := 1; i <= p.concurrency; i++ {
          worker := NewWorker(p.collector, i)
          p.Workers = append(p.Workers, worker)
          go worker.StartBackground()
         }

         for i := range p.Tasks {
          p.collector <- p.Tasks[i]
         }

         p.runBackground = make(chan bool)
         <-p.runBackground
        }

        // Stop stops background workers
        func (p *Pool) Stop() {
         for i := range p.Workers {
          p.Workers[i].Stop()
         }
         p.runBackground <- true
        }

        Pool 結(jié)構(gòu)體添加了兩個成員:Workers 和 runBackground,Workers 保存所有的 worker,runBackground 用于維持 pool 存活狀態(tài)。

        添加了三個新的方法,AddTask() 方法用于往 collector 添加任務(wù);RunBackground() 方法衍生出一個無限運行的 goroutine,以便 pool 維持存活狀態(tài),因為 runBackground 信道是空,讀取空的 channel 會阻塞,所以 pool 能維持運行狀態(tài)。接著,在協(xié)程里面啟動 worker;Stop() 方法用于停止 worker,并且給 runBackground 發(fā)送停止信號以便結(jié)束 RunBackground() 方法。

        我們來看下具體是如何工作的。

        如果是在現(xiàn)實的業(yè)務(wù)場景中,pool 將會與 HTTP 服務(wù)器一塊運行并消耗任務(wù)。我們通過 for 無限循環(huán)模擬這種這種場景,如果滿足某一條件,pool 將會停止。

        // main.go

        ...

        pool := workerpool.NewPool(allTask, 5)
        go func() {
         for {
          taskID := rand.Intn(100) + 20

          if taskID%7 == 0 {
           pool.Stop()
          }

          time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
          task := workerpool.NewTask(func(data interface{}) error {
           taskID := data.(int)
           time.Sleep(100 * time.Millisecond)
           fmt.Printf("Task %d processed\n", taskID)
           return nil
          }, taskID)
          pool.AddTask(task)
         }
        }()
        pool.RunBackground()

        當(dāng)執(zhí)行上面的代碼時,我們就會看到有隨機的 task 被投遞到后臺運行的 workers,其中某一個 worker 會讀取到任務(wù)并完成處理。當(dāng)滿足某一條件時,程序便會停止退出。

        總結(jié)

        基于上一篇文章的初步解決方案,這篇文章討論了通過 workPool 構(gòu)建一個強大的解決方案。同時,我們進一步擴展了該方案,實現(xiàn)后臺運行 pool 并處理投遞的任務(wù)。

        點擊【閱讀原文】直達代碼倉庫[1]。

        參考資料

        [1]

        代碼倉庫: https://github.com/Joker666/goworkerpool



        推薦閱讀


        福利

        我為大家整理了一份從入門到進階的Go學(xué)習(xí)資料禮包,包含學(xué)習(xí)建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲??;還可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。

        瀏覽 83
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            宅男噜噜噜66国产在线观看 | 操逼视频导航 | 俺来也在线视频 | 亚洲国产精品一区二区久久阿宾 | 国产精品 女同 | 女生被艹在线观看 | 日韩成人无码免费一区二区视频 | 日本三级香港三级 | 关之林大尺度床戏做爰 | 午夜男女羞羞视频 |