1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        【GoCN酷Go推薦】 errgroup 并發(fā)小工具

        共 9214字,需瀏覽 19分鐘

         ·

        2021-05-05 10:03

        使用場景:微服務(wù)中的并發(fā)請(qǐng)求

        并發(fā)編程是Golang語言的強(qiáng)大特性之一。在微服務(wù)架構(gòu)中,面對(duì)用戶的請(qǐng)求,我們常常需要向下游請(qǐng)求大量的數(shù)據(jù)繼而組裝成所需數(shù)據(jù),不同的數(shù)據(jù)很可能會(huì)由不同的服務(wù)提供,這里一一請(qǐng)求顯然是效率十分低效的,所以并發(fā)成為提高響應(yīng)效率的優(yōu)選方法。

        errgroup庫

        基礎(chǔ)版本安裝

        go get -u golang.org/x/sync/errgroup

        加強(qiáng)版本https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgroup

        演變歷程

        channel版本

            res_ch := make(chan interface{},3)
            go func() {
                r := funA()
                res_ch <- r
            }()
            go func() {
                r := funB()
                res_ch <- r
            }()
            go func() {
                r := funC()
                res_ch <- r
            }()
            res := make([]interface{},0,3)
            for i := 0; i < 3; i++ {
                data := <- res_ch
                res = append(res,data)
            }

        此版本運(yùn)用了官方推薦的用于goroutine通信的channel結(jié)構(gòu)。預(yù)計(jì)完整接收goroutine的結(jié)果。

        問題1:goroutine數(shù)量控制較為繁瑣

        問題2:若goroutine內(nèi)部發(fā)生錯(cuò)誤,會(huì)導(dǎo)致接收程序阻塞,無法正常退出

        基本版本errgroup

        源碼

            //源代碼結(jié)構(gòu)
            type Group struct {
             cancel func()

             wg sync.WaitGroup

             errOnce sync.Once
             err     error
            }

            func WithContext(ctx context.Context) (*Group, context.Context) {
                ctx, cancel := context.WithCancel(ctx)
                return &Group{cancel: cancel}, ctx
            }

            func (g *Group) Wait() error {
                g.wg.Wait()
                if g.cancel != nil {
                    g.cancel()
                }
                return g.err
            }

            func (g *Group) Go(f func() error) {
                g.wg.Add(1)

                go func() {
                    defer g.wg.Done()

                    if err := f(); err != nil {
                        g.errOnce.Do(func() {
                            g.err = err
                            if g.cancel != nil {
                                g.cancel()
                            }
                        })
                    }
                }()
            }

        閱讀源碼我們可以得知,Group結(jié)構(gòu)中使用sync.WaitGroup來控制goroutine的并發(fā),成員變量err來記錄運(yùn)行中發(fā)生的錯(cuò)誤,這里只記錄第一次返回的錯(cuò)誤值。

        使用

        group,ctx := errgroup.WithContent(context.Background())
        urls :=[]string{
            ...
        }
        for _,v := range urls {
            group.Go(func()error{
                resp,err := http.Get(v)
                if err != nil {
                    resp.Body.Close()
                }
                ...
                return err
            })
        }
        if err := g.Wait();err != nil {
            fmt.Println(err)
        }

        一些說明

        • Wait函數(shù)在所有g(shù)oroutine運(yùn)行結(jié)束才會(huì)返回,返回值記錄了第一個(gè)發(fā)生的錯(cuò)誤。
        • WithContext函數(shù)的第二返回值為ctx,Group會(huì)在goroutine發(fā)生錯(cuò)誤時(shí)調(diào)用與ctx對(duì)應(yīng)的cancel函數(shù),所以ctx不適合作為其他調(diào)用的參數(shù)。

        加強(qiáng)版本

        下面是kratos的errgroup加強(qiáng)版,其針對(duì)幾個(gè)問題作出的改進(jìn)。

        //基礎(chǔ)版本
        type Group struct {
         cancel func()

            wg sync.WaitGroup

            errOnce sync.Once
            err     error
        }    

        //kratos 版本
        type Group struct {
            err     error
            wg      sync.WaitGroup
            errOnce sync.Once

            workerOnce sync.Once
            ch         chan func(ctx context.Context) error
            chs        []func(ctx context.Context) error

            ctx    context.Context
            cancel func()
        }

        我們先從結(jié)構(gòu)體定義的角度來看待加強(qiáng)點(diǎn)。

        • ch、chs、workerOnce用于控制goroutine的并發(fā)數(shù)量,在基礎(chǔ)版的代碼中我們發(fā)現(xiàn)在使用Go(function()error)函數(shù)的調(diào)用過程中是全開放的,即對(duì)于同時(shí)進(jìn)行的goroutine數(shù)量并沒有做限制。kratos在基礎(chǔ)版本的基礎(chǔ)上添加了一個(gè)chan控制并發(fā)數(shù)量,一個(gè)slice來緩存為并發(fā)的函數(shù)指針。
        • kratos將產(chǎn)生的context對(duì)象緩存,并且更改了方法Go的函數(shù)簽名加入了context參數(shù),即func (g *Group) Go(f func(ctx context.Context) error)。在基礎(chǔ)版本中,當(dāng)error發(fā)生的是時(shí)候函數(shù),仍然需要等到所有g(shù)oroutine運(yùn)行結(jié)束才會(huì)返回,kratos的Group可以使用成員函數(shù)ctx作為參數(shù),從而控制全部并發(fā)的生命周期

        控制并發(fā)數(shù)量源碼分析

        func (g *Group) Go(f func(ctx context.Context) error) {
         g.wg.Add(1)
         if g.ch != nil {
          select {
          case g.ch <- f:
          default:
           g.chs = append(g.chs, f)
          }
          return
         }
         go g.do(f)
        }

        func (g *Group) GOMAXPROCS(n int) {
         if n <= 0 {
          panic("errgroup: GOMAXPROCS must great than 0")
         }
         g.workerOnce.Do(func() {
          g.ch = make(chan func(context.Context) error, n)
          for i := 0; i < n; i++ {
           go func() {
            for f := range g.ch {
             g.do(f)
            }
           }()
          }
         })
        }

        func (g *Group) Wait() error {
         if g.ch != nil {
          for _, f := range g.chs {
           g.ch <- f
          }
         }
         g.wg.Wait()
         if g.ch != nil {
          close(g.ch) // let all receiver exit
         }
         if g.cancel != nil {
          g.cancel()
         }
         return g.err
        }

        從Go函數(shù)中我們看到,當(dāng)g.ch != nil時(shí),f函數(shù)首先嘗試進(jìn)入g.ch中,當(dāng)g.ch滿的時(shí)候存入g.chs中,這就是上面提到的,利用chan控制并發(fā)數(shù)量,利用slice作為函數(shù)指針的緩存。

        GOMAXPROCE 函數(shù)初始化g.ch用于開啟并發(fā)數(shù)量控制的開關(guān)。并且啟動(dòng)n個(gè)goroutine來消費(fèi)傳入的函數(shù)。

        Wait函數(shù)中會(huì)不斷將緩存中的函數(shù)不斷壓入chan中進(jìn)行消費(fèi)。

        使用案例

        func sleep1s(context.Context) error {
         time.Sleep(time.Second)
         return nil
        }   

        {
            ...
            g := Group{}
            g.GOMAXPROCS(2)//開啟并發(fā)控制
            g.Go(sleep1s)
            g.Go(sleep1s)
            g.Go(sleep1s)
            g.Go(sleep1s)
            g.Wait()
            ....
        }


        總結(jié)


        errgroup 在sync.WaitGroup的功能之上添加了錯(cuò)誤傳遞,以及在發(fā)生不可恢復(fù)的錯(cuò)誤時(shí)取消整個(gè)goroutine集合的功能(返回值cancel)。

        kratos的加強(qiáng)版errgroup從統(tǒng)一goroutine控制,defer錯(cuò)誤捕獲,并發(fā)數(shù)量控制等方面對(duì)errgroup進(jìn)行了功能擴(kuò)充,利用匿名函數(shù)的參數(shù)context.Context的參數(shù)傳遞從整體上控制goroutine的生命周期。

        參考資料

        https://github.com/golang/sync/blob/master/errgroup/errgroup.go

        https://github.com/go-kratos/kratos/tree/v1.0.x/pkg/sync/errgrou


        還想了解更多嗎?

            更多請(qǐng)查看:https://github.com/golang/sync/blob/master/errgroup/errgroup.go    歡迎加入我們GOLANG中國社區(qū):https://gocn.vip/


        《酷Go推薦》招募:


        各位Gopher同學(xué),最近我們社區(qū)打算推出一個(gè)類似GoCN每日新聞的新欄目《酷Go推薦》,主要是每周推薦一個(gè)庫或者好的項(xiàng)目,然后寫一點(diǎn)這個(gè)庫使用方法或者優(yōu)點(diǎn)之類的,這樣可以真正的幫助到大家能夠?qū)W習(xí)到新的庫,并且知道怎么用。


        大概規(guī)則和每日新聞?lì)愃?,如果?bào)名人多的話每個(gè)人一個(gè)月輪到一次,歡迎大家報(bào)名?。▓?bào)名地址:https://wj.qq.com/s2/7734329/3f51)





        ??  各位Gopher們,注意啦!

        別忘了還有 Gopher China2021 大會(huì)

        還沒報(bào)名的童鞋們趕快抓住最后的機(jī)會(huì)?。?!


        點(diǎn)擊這里閱讀原文,即刻報(bào)名~
        瀏覽 33
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91无遮挡 | 国产三级做爰在线播放 | 成人免费视频播放成人无码免费视频播放 | 婷婷乱伦 | xxxx电影 | 国产精品动漫一区二区三区 | 成人男人免费毛片 | 少妇做受 高潮10在线 | 日日夜夜影音先锋 | 欧美又粗又大又长 |