1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Golang 從零到一開發(fā)實現(xiàn) RPC 框架(二)集群實現(xiàn)

        共 35068字,需瀏覽 71分鐘

         ·

        2022-07-01 01:33

        內(nèi)容提要

        在上一篇文章中分享了如何從零開始搭建一個 RPC 框架,并完成了 P2P 版本功能,本章繼續(xù)完善增加服務(wù)注冊發(fā)現(xiàn)和負載均衡實現(xiàn)集群能力。
        傳送門:
        RPC框架(一)
        本文主要內(nèi)容包括

        • RPC 接入服務(wù)注冊中心
        • 服務(wù)端實現(xiàn)平滑啟停

        • 客戶端實現(xiàn)服務(wù)發(fā)現(xiàn)

        • 客戶端實現(xiàn)負載均衡

        • 客戶端實現(xiàn)失敗策略

        服務(wù)注冊發(fā)現(xiàn)

        在 P2P 版本 RPC 中,客戶端要知道服務(wù)端的地址,并發(fā)起點對點連接,雖然滿足了服務(wù)調(diào)用的能力,但其弊端也顯而易見。為了保障服務(wù)高可用,通常會冗余部署多個服務(wù)端實例,而客戶端如何知道每一個服務(wù)實例的調(diào)用地址,服務(wù)端實例上下線又如何告知客戶端,這就需要引入服務(wù)自動注冊發(fā)現(xiàn)的能力。


        注冊發(fā)現(xiàn)是指客戶端具備動態(tài)發(fā)現(xiàn)服務(wù)端實例的能力,一般借助服務(wù)注冊中心來實現(xiàn),開源注冊中心有“Eurake”或“Nacos”等,本人之前專門有文章講過其實現(xiàn),對應(yīng)項目為 service_discovery,這里將以它為服務(wù)注冊中心,完成客戶端接入。

        具體參閱:

        服務(wù)注冊中心設(shè)計(一)

        服務(wù)注冊中心設(shè)計(二)

        首先定義客戶端接口,既要滿足服務(wù)提供者注冊/下線的能力,又要滿足服務(wù)消費者發(fā)現(xiàn)/觀察的能力。

        type Registry interface {
            Register(context.Context, *Instance) (context.CancelFunc, error)
            Fetch(context.Context, string) ([]*Instance, bool)
            Close() error
        }

        naming/naming.go

        定義 Discovery 繼承接口 Registry 實現(xiàn)與 “service_rpc” 接入,如果要使用“Eurake”或“Nacos”作為注冊中心,可以自定義擴展。
        type Discovery struct {
            once       *sync.Once
            conf       *Config
            ctx        context.Context
            cancelFunc context.CancelFunc
            //local cache
            mutex    sync.RWMutex
            apps     map[string]*FetchData
            registry map[string]struct{}
            //registry center node
            idx  uint64       //node index
            node atomic.Value //node list
        }
        func New(conf *Config) *Discovery {
            if len(conf.Nodes) == 0 {
                panic("conf nodes empty!")
            }
            ctx, cancel := context.WithCancel(context.Background())
            dis := &Discovery{
                ctx:        ctx,
                cancelFunc: cancel,
                conf:       conf,
                apps:       map[string]*FetchData{},
                registry:   map[string]struct{}{},
            }
            //from conf get node list
            dis.node.Store(conf.Nodes)
            go dis.updateNode()
            return dis
        }

        naming/discovery.go

        初始化 Discovery,默認從配置中獲取注冊中心節(jié)點(地址),并開啟單獨協(xié)程來定期更新維護節(jié)點變化。
        func (dis *Discovery) updateNode() {
            ticker := time.NewTicker(NodeInterval)
            defer ticker.Stop()
            for {
                select {
                case <-ticker.C:
                    uri := fmt.Sprintf(_nodesURL, dis.pickNode())
                    log.Println("discovery - request and update node, url:" + uri)
                    params := make(map[string]interface{})
                    params["env"] = dis.conf.Env
                    resp, err := HttpPost(uri, params)
                    if err != nil {
                        log.Println(err)
                        continue
                    }
                    res := ResponseFetch{}
                    err = json.Unmarshal([]byte(resp), &res)
                    if err != nil {
                        log.Println(err)
                        continue
                    }
                    newNodes := []string{}
                    for _, ins := range res.Data.Instances {
                        for _, addr := range ins.Addrs {
                          newNodes = append(newNodes, strings.TrimPrefix(addr, "http://"))
                        }
                    }
                    if len(newNodes) == 0 {
                        continue

                    }
                    curNodes := dis.node.Load().([]string)
                    if !compareNodes(curNodes, newNodes) {
                        dis.node.Store(newNodes)
                        log.Println("nodes list changed!", newNodes)
                        log.Println(newNodes)
                    } else {
                        log.Println("nodes list not change:", curNodes)
                    }
                }
            }
        }

        naming/discovery.go

        這里主要是開啟定時器,間隔時間(NodeInterval = 60*time.Second)去請求注冊中心接口(/api/nodes)獲取所有注冊中心服務(wù)器節(jié)點的地址,如果有變化則變更內(nèi)存維護的節(jié)點列表。
        //對比兩個數(shù)據(jù)是否完全相等
        func compareNodes(a, b []string) bool {
            if len(a) != len(b) {
                return false
            }
            mapB := make(map[string]struct{}, len(b))
            for _, node := range b {
                mapB[node] = struct{}{}
            }
            for _, node := range a {
                if _, ok := mapB[node]; !ok {
                    return false
                }
            }
            return true
        }

        naming/discovery.go

        實現(xiàn)服務(wù)注冊能力,先檢測本地緩存查看是否已注冊,沒有則請求注冊中心并發(fā)起注冊,異步維護一個定時任務(wù)來維持心跳(續(xù)約),如果發(fā)生終止則會調(diào)用取消接口從注冊中心注銷。

        func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
         {
            var err error
            //check local cache
            dis.mutex.Lock()
            if _, ok := dis.registry[instance.AppId]; ok {
                err = errors.New("instance duplicate register")
            } else {
                dis.registry[instance.AppId] = struct{}{} //register local cache
            }
            dis.mutex.Unlock()
            if err != nil {
                return nil, err 
            }
            //http register
            ctx, cancel := context.WithCancel(dis.ctx)
            if err = dis.register(instance); err != nil {
                //fail
                dis.mutex.Lock()
                delete(dis.registry, instance.AppId)
                dis.mutex.Unlock()
                return cancel, err
            }
            ch := make(chan struct{}, 1)
            cancelFunc := context.CancelFunc(func() {
                cancel()
                <-ch
            })
            //renew&cancel
            go func() {
                ticker := time.NewTicker(RenewInterval)
                defer ticker.Stop()
                for {
                    select {
                    case <-ticker.C:
                        if err := dis.renew(instance); err != nil {
                            dis.register(instance)
                        }
                    case <-ctx.Done():
                        dis.cancel(instance)
                        ch <- struct{}{}
                    }
                }

            }()
            return cancelFunc, nil
        }

        naming/discovery.go

        根據(jù)服務(wù)標(biāo)識(appId)獲取服務(wù)注冊信息,先從本地緩存中獲取,如不存在則從遠程注冊中心拉取并緩存下來。
        func (dis *Discovery) Fetch(ctx context.Context, appId string) ([]*Instance, bool) {
            //from local
            dis.mutex.RLock()
            fetchData, ok := dis.apps[appId]
            dis.mutex.RUnlock()
            if ok {
                log.Println("get data from local memory, appid:" + appId)
                return fetchData.Instances, ok
            }
            //from remote
            uri := fmt.Sprintf(_fetchURL, dis.pickNode())
            params := make(map[string]interface{})
            params["env"] = dis.conf.Env
            params["appid"] = appId
            params["status"] = 1 //up
            resp, err := HttpPost(uri, params)
            if err != nil {
                dis.switchNode()
                return nilfalse
            }
            res := ResponseFetch{}
            err = json.Unmarshal([]byte(resp), &res)
            if res.Code != 200 {
                return nilfalse
            }
            if err != nil {
                log.Println(err)
                return nilfalse
            }
            var result []*Instance
            for _, ins := range res.Data.Instances {
                result = append(result, ins)

            }
            if len(result) > 0 {
                ok = true
                dis.mutex.Lock()
                dis.apps[appId] = &res.Data
                dis.mutex.Unlock()
            }
            return result, ok
        }

        naming/discovery.go

        服務(wù)端改造

        服務(wù)端與注冊中心的交互包括服務(wù)啟動時會將自身服務(wù)信息(監(jiān)聽地址和端口)寫入注冊中心,開啟定時續(xù)約,在服務(wù)關(guān)閉退出時會注銷自身的注冊信息。

        服務(wù)啟動注冊

        首先在 RPCServer 中增加 Registry 用于綁定服務(wù)注冊中心實例。
        type RPCServer struct {
            listener   Listener
        ++  registry   naming.Registry
        }
        func NewRPCServer(option Option, registry naming.Registry) *RPCServer {
            return &RPCServer{
                listener: NewRPCListener(option),
        ++      registry: registry,
                option:   option,
            }
        }

        provider/server.go

        在服務(wù)端入口,實例化 RPCServer 時傳入注冊中心依賴。
        func main() {
            //服務(wù)注冊中心
            conf := &naming.Config{Nodes: config.RegistryAddrs, Env: config.Env}
            discovery := naming.New(conf)
            //注入依賴
            srv := provider.NewRPCServer(option, discovery)
        }

        demo/server/server.go

        服務(wù)啟動時將信息發(fā)布到注冊中心。考慮下“服務(wù)啟動”和"服務(wù)注冊"是否有先后順序? 也就是如何保障服務(wù)端“優(yōu)雅啟動”的問題。
        服務(wù)注冊到注冊中心后,客戶端即可刷到該服務(wù)的地址信息并發(fā)起連接調(diào)用,而此時如果服務(wù)端并沒有 ready ,就會導(dǎo)致服務(wù)調(diào)用失敗產(chǎn)生異常,所以一定要等到服務(wù)啟動完成后,再去暴露服務(wù)地址。像 Java 服務(wù)由于涉及到 JVM 預(yù)熱(將常用類字節(jié)碼轉(zhuǎn)為機器碼提高執(zhí)行效率),還會有延遲暴露的需求, Golang 服務(wù)可以不用考慮。
        func (svr *RPCServer) Run() {
            //先啟動后暴露服務(wù)
            err := svr.listener.Run()
            if err != nil {
                panic(err)
            }
            //register in discovery,注冊失?。ㄖ卦囀。┩顺龇?wù)
            err = svr.registerToNaming()
            if err != nil {
                svr.Close() //注冊失敗關(guān)閉服務(wù)
                panic(err)
            }   
        }
        provider/server.go
        服務(wù)注冊數(shù)據(jù)包括運行環(huán)境(env),服務(wù)標(biāo)識(appId),主機名(hostname),服務(wù)地址(addrs)等。向服務(wù)注冊中心發(fā)起注冊請求,失敗后會進行重試,如果重試失敗將會終止退出并關(guān)閉服務(wù)。
        func (svr *RPCServer) registerToNaming() error {
            instance := &naming.Instance{
                Env:      svr.option.Env,
                AppId:    svr.option.AppId,
                Hostname: svr.option.Hostname,
                Addrs:    svr.listener.GetAddrs(),
            }
            retries := maxRegisterRetry
            for retries > 0 {
                retries--
                cancel, err := svr.registry.Register(context.Background(), instance)
                if err == nil {
                    svr.cancelFunc = cancel
                    return nil
                }
            }
            return errors.New("register to naming server fail")
        }

        provider/server.go

        由于 registry.Register 已實現(xiàn)定時請求 renew,所以服務(wù)啟動后會自動開啟服務(wù)續(xù)約保持服務(wù)狀態(tài)。 


        做個測試,先啟動服務(wù)注冊中心(service_discovery),再運行 demo/server,通過配置不同端口和hostname,啟動兩個服務(wù),從服務(wù)注冊中心可以看到其結(jié)果。

        服務(wù)退出注銷

        服務(wù)關(guān)閉退出時需要將其從注冊中心一并移除,此時還需要考慮順序問題保障“優(yōu)雅退出。和啟動順序相反,啟動時先將服務(wù)啟動再去暴露給注冊中心,而退出時先從注冊中心注銷,再去關(guān)閉服務(wù)。想想看為什么?

        服務(wù)端從注冊中心注銷后,客戶端從注冊中心感知服務(wù)下線,就不再發(fā)送新連接和請求到該服務(wù)端實例。
        這里也可能有些問題,由于客戶端緩存機制導(dǎo)致客戶端感知服務(wù)端變化滯后,仍會有少許時間新連接和請求提交到當(dāng)前服務(wù)端。目前由于還未使用長鏈接管理,無法知曉有哪些客戶端連接。如果此時服務(wù)仍存活就正常處理返回,如果失敗可以
        返回“特殊失敗碼“,告知客戶端不要再請求了,服務(wù)端關(guān)閉了。

        func (svr *RPCServer) Close() {
            //從服務(wù)注冊中心注銷 
            if svr.cancelFunc != nil {
                svr.cancelFunc()
            }
            //關(guān)閉當(dāng)前服務(wù)
            if svr.listener != nil {
                svr.listener.Close()
            }
        }
        func (svr *RPCServer) registerToNaming() error {
        ++  cancel, err := svr.registry.Register(context.Background(), instance)
        ++  svr.cancelFunc = cancel
        }
        //注冊中心注冊 (naming/discovery.go)
        func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
         {
            ctx, cancel := context.WithCancel(dis.ctx)
            ch := make(chan struct{}, 1)
            cancelFunc := context.CancelFunc(func() {
                cancel()
                <-ch
            })
            for {
                    select {
                    case <-ctx.Done():
                        dis.cancel(instance) //服務(wù)注銷
                        ch <- struct{}{}
                    }
                }
            return cancelFunc, nil
        }
        provider/server.go
        協(xié)程間狀態(tài)同步通過 context.WithCancel 的方式,將服務(wù)注銷方法提供給外層協(xié)程調(diào)用。當(dāng)執(zhí)行 Close() 時,會執(zhí)行 cancelFun(),進而 cancel() 觸發(fā) ctx.Done(),完成 dis.cancel() ,將服務(wù)從注冊中心注銷。


        服務(wù)關(guān)閉時,除了不再接受新請求外,還需要考慮處理中的請求,不能因為服務(wù)關(guān)閉而強制中斷所有處理中的請求。根據(jù)請求所處階段不同,可以分別設(shè)置“擋板”,告知服務(wù)調(diào)用方當(dāng)前服務(wù)處于關(guān)閉流程,不再接受請求了。

        func main() {
            //...
            quit := make(chan os.Signal)
            signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
            <-quit
            srv.Shutdown()
        }

        demo/server/server.go

        在 main() 中捕獲服務(wù)退出系統(tǒng)信號,調(diào)用 Shutdown() 實現(xiàn)優(yōu)雅關(guān)閉,Shutdown 和 Close() 區(qū)別在于是否優(yōu)雅關(guān)閉。
        func (svr *RPCServer) Shutdown() {
            //從服務(wù)注冊中心注銷 
            if svr.cancelFunc != nil {
                svr.cancelFunc()
            }
            //關(guān)閉當(dāng)前服務(wù)
            if svr.listener != nil {
                svr.listener.Shutdown()
            }
        }

        provider/server.go

        上面說到根據(jù)請求所處階段不同,設(shè)置擋板,那么都有哪些階段呢?

        (1)首先是服務(wù)端接收到客戶端連接階段。如果此時發(fā)現(xiàn)服務(wù)關(guān)閉,設(shè)置擋板不再往下執(zhí)行,直接返回。

        func (l *RPCListener) Run() error {
            //... listen ...    
        ++    go l.acceptConn()  //accept conn
        }
        func (l *RPCListener) acceptConn() {
            for {
                conn, err := l.nl.Accept()
                if err != nil {
                    select {
                    case <-l.getDoneChan(): //擋板:server closed done
                        return 
                    default:
                    }
                    return
                }
                go l.handleConn(conn) //處理連接
            }
        }
        type RPCListener struct {
        ++    doneChan    chan struct{} //控制結(jié)束
        }
        func (l *RPCListener) getDoneChan() <-chan struct{} {
            return l.doneChan
        }
        //關(guān)閉時關(guān)閉通道
        func (l *RPCListener) Shutdown() {
            l.closeDoneChan()
        }
        //關(guān)閉通道
        func (l *RPCListener) closeDoneChan() {
            select {
            case <-l.doneChan:
            default:
                close(l.doneChan)
            }
        }

        provider/listener.go

        在服務(wù)關(guān)閉時,會關(guān)閉 doneChan 通道,這樣上游 acceptConn 就會收到  <-l.getDoneChan() 數(shù)據(jù),代表服務(wù)正在關(guān)閉,不繼續(xù)處理請求,起到擋板的作用。
        (2)接著是開始處理請求階段。優(yōu)先判斷服務(wù)是否正在關(guān)閉,關(guān)閉則退出處理流程。通過設(shè)置一個全局標(biāo)志位(shutdown),關(guān)閉服務(wù)時原子操作設(shè)置其值為 1,并通過判斷值是否為 1,來去攔截請求。
        func (l *RPCListener) handleConn(conn net.Conn) {
             //關(guān)閉擋板
        ++   if l.isShutdown() {
        ++       return
        ++   }
             for {
        ++      if l.isShutdown() {
        ++        return
        ++      }  
                //handle ...     
             }
        }
        type RPCListener struct {
        ++   shutdown    int32         //關(guān)閉處理中標(biāo)識位
        }
        //判斷是否關(guān)閉
        func (l *RPCListener) isShutdown() bool {
            return atomic.LoadInt32(&l.shutdown) == 1
        }
        //關(guān)閉邏輯
        func (l *RPCListener) Shutdown() {
            atomic.CompareAndSwapInt32(&l.shutdown, 01)
        }

        provider/listener.go

        (3)最后請求已進入服務(wù)實際處理階段。此時無法簡單設(shè)置擋板了,因為已經(jīng)是處理中,就應(yīng)該將請求處理完成。但我們需要確認有多少處理中的請求,并且確保這些請求全部執(zhí)行完成,然后就可以安全退出了。這有點像 WaitGroup 計數(shù)器,我們也維護一個處理中任務(wù)計數(shù)來達到目的。

        type RPCListener struct {
        ++    handlingNum int32         //處理中任務(wù)數(shù)
        }
        func (l *RPCListener) handleConn(conn net.Conn) {
           //...
           //處理中任務(wù)數(shù)+1
        ++ atomic.AddInt32(&l.handlingNum, 1)
           //任意退出都會導(dǎo)致處理中任務(wù)數(shù)-1
        ++ defer atomic.AddInt32(&l.handlingNum, -1)
           //read from network
           //decode
           //call local func
           //encode
           //send result
        }
        func (l *RPCListener) Shutdown() {
            atomic.CompareAndSwapInt32(&l.shutdown, 01)
        ++  for {
        ++      if atomic.LoadInt32(&l.handlingNum) == 0 {
        ++          break
        ++      }
        ++  }
            l.closeDoneChan()
        }

        provider/listener.go

        對于請求處理時間過長或者請求掛起的情況,可以加上超時時間控制,當(dāng)超過指定時間仍未結(jié)束,則強制退出應(yīng)用。

        客戶端改造

        實現(xiàn)服務(wù)發(fā)現(xiàn)

        客戶端通過 client_proxy 接入服務(wù)發(fā)現(xiàn),首先要在初始化時增加服務(wù)端的標(biāo)識(appId),通過服務(wù)注冊中心獲取該標(biāo)識對應(yīng)的實例列表。

        func NewClientProxy(appId string, option Option, registry naming.Registry) ClientProxy {
            cp := &RPCClientProxy{
                option:   option,
                failMode: option.FailMode,
                registry: registry,
            }
            servers, err := cp.discoveryService(context.Background(), appId)
            if err != nil {
                log.Fatal(err)
            }
            cp.servers = servers
            cp.loadBalance = LoadBalanceFactory(option.LoadBalanceMode, cp.servers)
            return cp
        }
        //獲取服務(wù)列表
        func (cp *RPCClientProxy) discoveryService(ctx context.Context, appId string) ([]string, error) {
            instances, ok := cp.registry.Fetch(ctx, appId)
            if !ok {
                return nil, errors.New("service not found")
            }
            var servers []string
            for _, instance := range instances {
                servers = append(servers, instance.Addrs...)
            }
            return servers, nil
        }

        consumer/client_proxy.go

        實現(xiàn)負載均衡

        獲取到的服務(wù)實例為一組地址列表,存在多個服務(wù)端實例,那么要選擇哪個實例發(fā)起調(diào)用?這里涉及路由策略和負載均衡策略。
        先不考慮路由分組的情況,現(xiàn)在我們要實現(xiàn)一個負載均衡器,保障可以合理地將請求分布到各個服務(wù)端實例節(jié)點中。常見的負載均衡策略有隨機、輪詢、加權(quán)輪詢、一致性 hash、最小連接數(shù)等。之前我們有一篇文章就分析過負載均衡,可以參閱:負載均衡原理
        type LoadBalanceMode int
        const (
            RandomBalance LoadBalanceMode = iota
            RoundRobinBalance
            WeightRoundRobinBalance
        )        
        type LoadBalance interface {
            Get() string
        }    
        func LoadBalanceFactory(mode LoadBalanceMode, servers []string) LoadBalance {
            switch mode {
            case RandomBalance:
                return newRandomBalance(servers)
            case RoundRobinBalance:
                return newRoundRobinBalance(servers)
            default:
                return newRandomBalance(servers)
            }
        }

        consumer/loadbalance.go

        通過工廠模式封裝負載均衡策略,首先實現(xiàn)隨機負載均衡策略。
        type randomBalance struct {
            servers []string
        }
        func newRandomBalance(servers []string) LoadBalance {
            return &randomBalance{servers: servers}
        }
        func (b *randomBalance) Get() string {
            rand.Seed(time.Now().Unix())
            return b.servers[rand.Intn(len(b.servers))]
        }

        consumer/loadbalance.go

        輪詢策略,主要是維護一個全局指針?biāo)饕?,?0 開始不斷遞增,超過數(shù)組長度后重置。其他策略這里就不展開了,可以根據(jù)需要自定義擴展。
        type roundRobinBalance struct {
            servers []string
            curIdx  int
        }
        func newRoundRobinBalance(servers []string) LoadBalance {
            return &roundRobinBalance{servers: servers, curIdx: 0}
        }
        func (b *roundRobinBalance) Get() string {
            lens := len(b.servers)
            if b.curIdx >= lens {
                b.curIdx = 0
            }
            server := b.servers[b.curIdx]
            b.curIdx = (b.curIdx + 1) % lens
            return server
        }

        consumer/loadbalance.go

        接下來,RPCClientProxy 即可通過負載均衡器選取出服務(wù)端實例節(jié)點,并發(fā)起連接。目前仍為短連接,如果使用長鏈接還會涉及到連接池管理,可以在后續(xù)迭代中加入。
        func (cp *RPCClientProxy) getConn() error {
            addr := strings.Replace(cp.loadBalance.Get(), cp.option.NetProtocol+"://"""-1)
            err := cp.client.Connect(addr) //長連接管理
            if err != nil {
                return err
            }
            return nil
        }

        consumer/client_proxy.go

        實現(xiàn)失敗策略

        執(zhí)行調(diào)用階段還要考慮失敗策略,即在調(diào)用服務(wù)端過程中出錯后如何處理?這里出錯通常是網(wǎng)絡(luò)原因或是服務(wù)端程序異常產(chǎn)生,而非業(yè)務(wù)錯誤。

        處理辦法可分為接受失敗或發(fā)起重試,接受失敗對應(yīng)策略就是 Failfast (快速失?。6卦嚳梢岳^續(xù)對上一次服務(wù)端地址發(fā)起調(diào)用 Failtry,它可以解決臨時性網(wǎng)絡(luò)失敗,但如果該實例服務(wù)端掛了再重試幾次也無濟于事,所以有另一個種策略 Failover,也就是故障轉(zhuǎn)移,換個服務(wù)端實例再試。


        type FailMode int 
        const (
            Failover FailMode = iota
            Failfast
            Failretry
        )

        consumer/fail.go

        如果獲取客戶端連接失敗, Failfast 策略就直接失敗返回。
        func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ..
        .interface{})
         (interface{}, error)
         {
            service, err := NewService(servicePath)
            if err != nil {
                return nil, err 
            }
            err := cp.getConn()
            if err != nil && cp.failMode == Failfast { //快速失敗
                return nil, err 
            }
            //失敗策略
            switch cp.failMode {
            case Failretry:
            //...
            case Failover:
            //...
            case Failfast:
            //...
            }
            return nil, errors.New("call error")
        }

        consumer/client_proxy.go

        重試策略下,如果調(diào)用成功即直接退出,調(diào)用失敗會重試 retries 次,F(xiàn)ailover 會重新獲取一個新的連接。
        switch cp.failMode {
            case Failretry:
                retries := cp.option.Retries
                for retries > 0 {
                    retries--
                    if client != nil {
                        rs, err := cp.client.Invoke(ctx, service, stub, params...)
                        if err == nil {
                            return rs, nil
                        }
                    }
                }
            case Failover:
                retries := cp.option.Retries
                for retries > 0 {
                    retries--
                    if client != nil {
                        rs, err := cp.client.Invoke(ctx, service, stub, params...)
                        if err == nil {
                            return rs, nil
                        }
                    }
                    err = cp.getConn()
                }
            case Failfast:
                if client != nil {
                    rs, err := cp.client.Invoke(ctx, service, stub, params...)
                    if err == nil {
                        return rs, nil
                    }
                    return nil, err
                }

        consumer/client_proxy.go

        最后測試客戶端效果,發(fā)現(xiàn)服務(wù)端(UserService 服務(wù))兩個實例地址分別是 8898 和 8899,發(fā)起調(diào)用默認使用輪詢策略,依次完成調(diào)用并獲取到結(jié)果。
        如果服務(wù)端失敗,根據(jù) Failover 策略連接到另一實例并執(zhí)行成功。


        總結(jié)與補充

        這一版 RPC 框架具備了集群能力、負載均衡和簡單容錯能力,當(dāng)然離一個完善的微服務(wù)框架仍有不少距離,所以后續(xù)會陸續(xù)迭代,希望大家多多支持。


        文章完整代碼請關(guān)注公眾號  技術(shù)歲月 ,發(fā)送關(guān)鍵字 RPC 獲取,服務(wù)注冊中心代碼發(fā)送 注冊發(fā)現(xiàn) 獲取。
        瀏覽 122
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            污91香蕉视频 | 免费看的一级黄色毛片 | 看污的网站 | 91福利视频导航 | 国产草草 | 国产精品偷伦视频免费观看国产 | 成人AV秘 片一区二区三区 | 精品国产精品国产自在久国产 | 女被c视频 | 国产欧美一区二区精品婷婷 |