1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        通過(guò) MapReduce 降低服務(wù)響應(yīng)時(shí)間

        共 1810字,需瀏覽 4分鐘

         ·

        2020-09-24 12:17

        在微服務(wù)中開(kāi)發(fā)中,api 網(wǎng)關(guān)扮演對(duì)外提供 restful api 的角色,而 api 的數(shù)據(jù)往往會(huì)依賴(lài)其他服務(wù),復(fù)雜的 api 更是會(huì)依賴(lài)多個(gè)甚至數(shù)十個(gè)服務(wù)。雖然單個(gè)被依賴(lài)服務(wù)的耗時(shí)一般都比較低,但如果多個(gè)服務(wù)串行依賴(lài)的話(huà)那么整個(gè) api 的耗時(shí)將會(huì)大大增加。

        那么通過(guò)什么手段來(lái)優(yōu)化呢?我們首先想到的是通過(guò)并發(fā)來(lái)的方式來(lái)處理依賴(lài),這樣就能降低整個(gè)依賴(lài)的耗時(shí),Go 基礎(chǔ)庫(kù)中為我們提供了 WaitGroup 工具用來(lái)進(jìn)行并發(fā)控制,但實(shí)際業(yè)務(wù)場(chǎng)景中多個(gè)依賴(lài)如果有一個(gè)出錯(cuò)我們期望能立即返回而不是等所有依賴(lài)都執(zhí)行完再返回結(jié)果,而且 WaitGroup 中對(duì)變量的賦值往往需要加鎖,每個(gè)依賴(lài)函數(shù)都需要添加 Add 和 Done 對(duì)于新手來(lái)說(shuō)比較容易出錯(cuò)

        基于以上的背景,go-zero 框架中為我們提供了并發(fā)處理工具M(jìn)apReduce,該工具開(kāi)箱即用,不需要做什么初始化,我們通過(guò)下圖看下使用 MapReduce 和沒(méi)使用的耗時(shí)對(duì)比:


        相同的依賴(lài),串行處理的話(huà)需要 200ms,使用 MapReduce 后的耗時(shí)等于所有依賴(lài)中最大的耗時(shí)為 100ms,可見(jiàn) MapReduce 可以大大降低服務(wù)耗時(shí),而且隨著依賴(lài)的增加效果就會(huì)越明顯,減少處理耗時(shí)的同時(shí)并不會(huì)增加服務(wù)器壓力

        并發(fā)處理工具M(jìn)apReduce

        MapReduce是 Google 提出的一個(gè)軟件架構(gòu),用于大規(guī)模數(shù)據(jù)集的并行運(yùn)算,go-zero 中的 MapReduce 工具正是借鑒了這種架構(gòu)思想

        go-zero 框架中的 MapReduce 工具主要用來(lái)對(duì)批量數(shù)據(jù)進(jìn)行并發(fā)的處理,以此來(lái)提升服務(wù)的性能



        我們通過(guò)幾個(gè)示例來(lái)演示 MapReduce 的用法

        MapReduce 主要有三個(gè)參數(shù),第一個(gè)參數(shù)為 generate 用以生產(chǎn)數(shù)據(jù),第二個(gè)參數(shù)為 mapper 用以對(duì)數(shù)據(jù)進(jìn)行處理,第三個(gè)參數(shù)為 reducer 用以對(duì) mapper 后的數(shù)據(jù)做聚合返回,還可以通過(guò) opts 選項(xiàng)設(shè)置并發(fā)處理的線(xiàn)程數(shù)量

        場(chǎng)景一: 某些功能的結(jié)果往往需要依賴(lài)多個(gè)服務(wù),比如商品詳情的結(jié)果往往會(huì)依賴(lài)用戶(hù)服務(wù)、庫(kù)存服務(wù)、訂單服務(wù)等等,一般被依賴(lài)的服務(wù)都是以 rpc 的形式對(duì)外提供,為了降低依賴(lài)的耗時(shí)我們往往需要對(duì)依賴(lài)做并行處理

        type ProductDetail struct {    User  interface{}    Store interface{}    Order interface{}}
        func productDetail(uid, pid int64) (*ProductDetail, error) { var pd ProductDetail err := mr.Finish(func() (err error) { pd.User, err = userRpc.User(uid) return }, func() (err error) { pd.Store, err = storeRpc.Store(pid) return }, func() (err error) { pd.Order, err = orderRpc.Order(pid) return })
        if err != nil { log.Printf("product detail error: %v", err) return nil, err }
        return &pd, nil}

        該示例中返回商品詳情依賴(lài)了多個(gè)服務(wù)獲取數(shù)據(jù),因此做并發(fā)的依賴(lài)處理,對(duì)接口的性能有很大的提升

        場(chǎng)景二: 很多時(shí)候我們需要對(duì)一批數(shù)據(jù)進(jìn)行處理,比如對(duì)一批用戶(hù) id,效驗(yàn)每個(gè)用戶(hù)的合法性并且效驗(yàn)過(guò)程中有一個(gè)出錯(cuò)就認(rèn)為效驗(yàn)失敗,返回的結(jié)果為效驗(yàn)合法的用戶(hù) id

        func checkLegal(uids []int64) ([]int64, error) {    r, err := mr.MapReduce(func(source chan<- interface{}) {        for _, uid := range uids {            source <- uid        }    }, func(item interface{}, writer mr.Writer, cancel func(error)) {        uid := item.(int64)        ok, err := check(uid)        if err != nil {            cancel(err)        }        if ok {            writer.Write(uid)        }    }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {        var uids []int64        for p := range pipe {            uids = append(uids, p.(int64))        }        writer.Write(uids)    })    if err != nil {        log.Printf("check error: %v", err)        return nil, err    }
        return r.([]int64), nil}
        func check(uid int64) (bool, error) { // do something check user legal return true, nil}


        該示例中,如果 check 過(guò)程出現(xiàn)錯(cuò)誤則通過(guò) cancel 方法結(jié)束效驗(yàn)過(guò)程,并返回 error 整個(gè)效驗(yàn)過(guò)程結(jié)束,如果某個(gè) uid 效驗(yàn)結(jié)果為 false 則最終結(jié)果不返回該 uid

        MapReduce 使用注意事項(xiàng)

        • mapper 和 reducer 中都可以調(diào)用 cancel,參數(shù)為 error,調(diào)用后立即返回,返回結(jié)果為 nil, error

        • mapper 中如果不調(diào)用 writer.Write 則 item 最終不會(huì)被 reducer 聚合

        • reducer 中如果不調(diào)用 writer.Wirte 則返回結(jié)果為 nil, ErrReduceNoOutput

        • reducer 為單線(xiàn)程,所有 mapper 出來(lái)的結(jié)果在這里串行聚合

        實(shí)現(xiàn)原理分析:

        MapReduce 中首先通過(guò) buildSource 方法通過(guò)執(zhí)行 generate(參數(shù)為無(wú)緩沖 channel) 產(chǎn)生數(shù)據(jù),并返回?zé)o緩沖的 channel,mapper 會(huì)從該 channel 中讀取數(shù)據(jù)

        func buildSource(generate GenerateFunc) chan interface{} {    source := make(chan interface{})    go func() {        defer close(source)        generate(source)    }()
        return source}


        在 MapReduceWithSource 方法中定義了 cancel 方法,mapper 和 reducer 中都可以調(diào)用該方法,調(diào)用后主線(xiàn)程收到 close 信號(hào)會(huì)立馬返回

        cancel := once(func(err error) {    if err != nil {        retErr.Set(err)    } else {        // 默認(rèn)的error        retErr.Set(ErrCancelWithNil)    }
        drain(source) // 調(diào)用close(ouput)主線(xiàn)程收到Done信號(hào),立馬返回 finish()})

        在 mapperDispatcher 方法中調(diào)用了 executeMappers,executeMappers 消費(fèi) buildSource 產(chǎn)生的數(shù)據(jù),每一個(gè) item 都會(huì)起一個(gè) goroutine 單獨(dú)處理,默認(rèn)最大并發(fā)數(shù)為 16,可以通過(guò) WithWorkers 進(jìn)行設(shè)置

        var wg sync.WaitGroupdefer func() {    wg.Wait() // 保證所有的item都處理完成    close(collector)}()
        pool := make(chan lang.PlaceholderType, workers)writer := newGuardedWriter(collector, done) // 將mapper處理完的數(shù)據(jù)寫(xiě)入collectorfor { select { case <-done: // 當(dāng)調(diào)用了cancel會(huì)觸發(fā)立即返回 return case pool <- lang.Placeholder: // 控制最大并發(fā)數(shù) item, ok := <-input if !ok { <-pool return }
        wg.Add(1) go func() { defer func() { wg.Done() <-pool }()
        mapper(item, writer) // 對(duì)item進(jìn)行處理,處理完調(diào)用writer.Write把結(jié)果寫(xiě)入collector對(duì)應(yīng)的channel中 }() }}

        reducer 單 goroutine 對(duì)數(shù) mapper 寫(xiě)入 collector 的數(shù)據(jù)進(jìn)行處理,如果 reducer 中沒(méi)有手動(dòng)調(diào)用 writer.Write 則最終會(huì)執(zhí)行 finish 方法對(duì) output 進(jìn)行 close 避免死鎖

        go func() {    defer func() {        if r := recover(); r != nil {            cancel(fmt.Errorf("%v", r))        } else {            finish()        }    }()    reducer(collector, writer, cancel)    drain(collector)}()

        在該工具包中還提供了許多針對(duì)不同業(yè)務(wù)場(chǎng)景的方法,實(shí)現(xiàn)原理與 MapReduce 大同小異,感興趣的同學(xué)可以查看源碼學(xué)習(xí)

        • MapReduceVoid 功能和 MapReduce 類(lèi)似但沒(méi)有結(jié)果返回只返回 error

        • Finish 處理固定數(shù)量的依賴(lài),返回 error,有一個(gè) error 立即返回

        • FinishVoid 和 Finish 方法功能類(lèi)似,沒(méi)有返回值

        • Map 只做 generate 和 mapper 處理,返回 channel

        • MapVoid 和 Map 功能類(lèi)似,無(wú)返回

        本文主要介紹了 go-zero 框架中的 MapReduce 工具,在實(shí)際的項(xiàng)目中非常實(shí)用。用好工具對(duì)于提升服務(wù)性能和開(kāi)發(fā)效率都有很大的幫助,go-zero 框架中還提供了許多其他的實(shí)用工具,由于篇幅有限后續(xù)文章再做介紹,希望本篇文章能給大家?guī)?lái)幫助

        組件地址

        https://github.com/tal-tech/go-zero/blob/master/core/mr/mapreduce.go

        Example

        https://github.com/tal-tech/go-zero/blob/master/example/mapreduce/mr/mr.go

        微信交流群


        瀏覽 47
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            无遮挡边摸边吃奶边做视频免费 | 影音先锋成人电影在线观看 | 亚洲免费成人网站 | 操屄无码 | 女人张开腿免费视频 | 国产成人久久77777精品 九一综合色 | 色色色婷婷 | 亚洲一级天堂 | 在线黄色AV网站 | 成人中文字幕在线观看 |