1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        企業(yè)級 RPC 框架 zRPC

        共 4407字,需瀏覽 9分鐘

         ·

        2020-10-21 12:32

        近期比較火的開源項目go-zero是一個集成了各種工程實踐的包含了 Web 和 RPC 協(xié)議的功能完善的微服務(wù)框架,今天我們就一起來分析一下其中的 RPC 部分zRPC。

        zRPC 底層依賴 gRPC,內(nèi)置了服務(wù)注冊、負載均衡、攔截器等模塊,其中還包括自適應(yīng)降載,自適應(yīng)熔斷,限流等微服務(wù)治理方案,是一個簡單易用的可直接用于生產(chǎn)的企業(yè)級 RPC 框架。

        zRPC 初探

        zRPC 支持直連和基于 etcd 服務(wù)發(fā)現(xiàn)兩種方式,我們以基于 etcd 做服務(wù)發(fā)現(xiàn)為例演示 zRPC 的基本使用:

        配置

        創(chuàng)建 hello.yaml 配置文件,配置如下:

        Name: hello.rpc           // 服務(wù)名
        ListenOn: 127.0.0.1:9090 // 服務(wù)監(jiān)聽地址
        Etcd:
        Hosts:
        - 127.0.0.1:2379 // etcd服務(wù)地址
        Key: hello.rpc // 服務(wù)注冊key
        創(chuàng)建 proto 文件

        創(chuàng)建 hello.proto 文件,并生成對應(yīng)的 go 代碼

        syntax = "proto3";

        package pb;

        service Greeter {
        rpc SayHello (HelloRequest) returns (HelloReply) {}
        }

        message HelloRequest {
        string name = 1;
        }

        message HelloReply {
        string message = 1;
        }

        生成 go 代碼

        protoc --go_out=plugins=grpc:. hello.proto
        Server 端
        package main

        import (
        "context"
        "flag"
        "log"

        "example/zrpc/pb"

        "github.com/tal-tech/go-zero/core/conf"
        "github.com/tal-tech/go-zero/zrpc"
        "google.golang.org/grpc"
        )

        type Config struct {
        zrpc.RpcServerConf
        }

        var cfgFile = flag.String("f", "./hello.yaml", "cfg file")

        func main() {
        flag.Parse()

        var cfg Config
        conf.MustLoad(*cfgFile, &cfg)

        srv, err := zrpc.NewServer(cfg.RpcServerConf, func(s *grpc.Server) {
        pb.RegisterGreeterServer(s, &Hello{})
        })
        if err != nil {
        log.Fatal(err)
        }
        srv.Start()
        }

        type Hello struct{}

        func (h *Hello) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
        return &pb.HelloReply{Message: "hello " + in.Name}, nil
        }
        Client 端
        package main

        import (
        "context"
        "log"

        "example/zrpc/pb"

        "github.com/tal-tech/go-zero/core/discov"
        "github.com/tal-tech/go-zero/zrpc"
        )

        func main() {
        client := zrpc.MustNewClient(zrpc.RpcClientConf{
        Etcd: discov.EtcdConf{
        Hosts: []string{"127.0.0.1:2379"},
        Key: "hello.rpc",
        },
        })

        conn := client.Conn()
        hello := pb.NewGreeterClient(conn)
        reply, err := hello.SayHello(context.Background(), &pb.HelloRequest{Name: "go-zero"})
        if err != nil {
        log.Fatal(err)
        }
        log.Println(reply.Message)
        }

        啟動服務(wù),查看服務(wù)是否注冊:

        ETCDCTL_API=3 etcdctl get hello.rpc --prefix

        顯示服務(wù)已經(jīng)注冊:

        hello.rpc/7587849401504590084
        127.0.0.1:9090

        運行客戶端即可看到輸出:

        hello go-zero

        這個例子演示了 zRPC 的基本使用,可以看到通過 zRPC 構(gòu)建 RPC 服務(wù)非常簡單,只需要很少的幾行代碼,接下來我們繼續(xù)進行探索

        zRPC 原理分析

        下圖展示 zRPC 的架構(gòu)圖和主要組成部分



        zRPC 主要有以下幾個模塊組成:

        • discov: 服務(wù)發(fā)現(xiàn)模塊,基于 etcd 實現(xiàn)服務(wù)發(fā)現(xiàn)功能

        • resolver: 服務(wù)注冊模塊,實現(xiàn)了 gRPC 的 resolver.Builder 接口并注冊到 gRPC

        • interceptor: 攔截器,對請求和響應(yīng)進行攔截處理

        • balancer: 負載均衡模塊,實現(xiàn)了 p2c 負載均衡算法,并注冊到 gRPC

        • client: zRPC 客戶端,負責(zé)發(fā)起請求

        • server: zRPC 服務(wù)端,負責(zé)處理請求

        這里介紹了 zRPC 的主要組成模塊和每個模塊的主要功能,其中 resolver 和 balancer 模塊實現(xiàn)了 gRPC 開放的接口,實現(xiàn)了自定義的 resolver 和 balancer,攔截器模塊是整個 zRPC 的功能重點,自適應(yīng)降載、自適應(yīng)熔斷、prometheus 服務(wù)指標收集等功能都在這里實現(xiàn)

        Interceptor 模塊

        gRPC 提供了攔截器功能,主要是對請求前后進行額外處理的攔截操作,其中攔截器包含客戶端攔截器和服務(wù)端攔截器,又分為一元 (Unary) 攔截器和流 (Stream) 攔截器,這里我們主要講解一元攔截器,流攔截器同理。



        客戶端攔截器定義如下:

        type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

        其中 method 為方法名,req,reply 分別為請求和響應(yīng)參數(shù),cc 為客戶端連接對象,invoker 參數(shù)是真正執(zhí)行 rpc 方法的 handler 其實在攔截器中被調(diào)用執(zhí)行

        服務(wù)端攔截器定義如下:

        type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

        其中 req 為請求參數(shù),info 中包含了請求方法屬性,handler 為對 server 端方法的包裝,也是在攔截器中被調(diào)用執(zhí)行

        zRPC 中內(nèi)置了豐富的攔截器,其中包括自適應(yīng)降載、自適應(yīng)熔斷、權(quán)限驗證、prometheus 指標收集等等,由于攔截器較多,篇幅有限沒法所有的攔截器給大家一一解析,這里我們主要分析兩個,自適應(yīng)熔斷和 prometheus 服務(wù)監(jiān)控指標收集:

        內(nèi)置攔截器分析

        自適應(yīng)熔斷 (breaker)

        當客戶端向服務(wù)端發(fā)起請求,客戶端會記錄服務(wù)端返回的錯誤,當錯誤達到一定的比例,客戶端會自行的進行熔斷處理,丟棄掉一定比例的請求以保護下游依賴,且可以自動恢復(fù)。zRPC 中自適應(yīng)熔斷遵循《Google SRE》中過載保護策略,算法如下:



        requests: 總請求數(shù)量

        accepts: 正常請求數(shù)量

        K: 倍值 (Google SRE 推薦值為 2)

        可以通過修改 K 的值來修改熔斷發(fā)生的激進程度,降低 K 的值會使得自適應(yīng)熔斷算法更加激進,增加 K 的值則自適應(yīng)熔斷算法變得不再那么激進

        熔斷攔截器定義如下:

        func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
        cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        // target + 方法名
        breakerName := path.Join(cc.Target(), method)
        return breaker.DoWithAcceptable(breakerName, func() error {
        // 真正執(zhí)行調(diào)用
        return invoker(ctx, method, req, reply, cc, opts...)
        }, codes.Acceptable)
        }

        accept 方法實現(xiàn)了 Google SRE 過載保護算法,判斷否進行熔斷

        func (b *googleBreaker) accept() error {
        // accepts為正常請求數(shù),total為總請求數(shù)
        accepts, total := b.history()
        weightedAccepts := b.k * float64(accepts)
        // 算法實現(xiàn)
        dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
        if dropRatio <= 0 {
        return nil
        }
        // 是否超過比例
        if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
        }

        return nil
        }

        doReq 方法首先判斷是否熔斷,滿足條件直接返回 error(circuit breaker is open),不滿足條件則對請求數(shù)進行累加

        func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
        if err := b.accept(); err != nil {
        if fallback != nil {
        return fallback(err)
        } else {
        return err
        }
        }

        defer func() {
        if e := recover(); e != nil {
        b.markFailure()
        panic(e)
        }
        }()

        // 此處執(zhí)行RPC請求
        err := req()
        // 正常請求total和accepts都會加1
        if acceptable(err) {
        b.markSuccess()
        } else {
        // 請求失敗只有total會加1
        b.markFailure()
        }

        return err
        }
        prometheus 指標收集

        服務(wù)監(jiān)控是了解服務(wù)當前運行狀態(tài)以及變化趨勢的重要手段,監(jiān)控依賴于服務(wù)指標的收集,通過 prometheus 進行監(jiān)控指標的收集是業(yè)界主流方案,zRPC 中也采用了 prometheus 來進行指標的收集

        prometheus 攔截器定義如下:

        這個攔截器主要是對服務(wù)的監(jiān)控指標進行收集,這里主要是對 RPC 方法的耗時和調(diào)用錯誤進行收集,這里主要使用了 Prometheus 的 Histogram 和 Counter 數(shù)據(jù)類型

        func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
        return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
        interface{}, error) {
        // 執(zhí)行前記錄一個時間
        startTime := timex.Now()
        resp, err := handler(ctx, req)
        // 執(zhí)行后通過Since算出執(zhí)行該調(diào)用的耗時
        metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
        // 方法對應(yīng)的錯誤碼
        metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
        return resp, err
        }
        }

        添加自定義攔截器

        除了內(nèi)置了豐富的攔截器之外,zRPC 同時支持添加自定義攔截器

        Client 端通過 AddInterceptor 方法添加一元攔截器:

        func (rc *RpcClient) AddInterceptor(interceptor grpc.UnaryClientInterceptor) {
        rc.client.AddInterceptor(interceptor)
        }

        Server 端通過 AddUnaryInterceptors 方法添加一元攔截器:

        func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
        rs.server.AddUnaryInterceptors(interceptors...)
        }

        resolver 模塊

        zRPC 服務(wù)注冊架構(gòu)圖:



        zRPC 中自定義了 resolver 模塊,用來實現(xiàn)服務(wù)的注冊功能。zRPC 底層依賴 gRPC,在 gRPC 中要想自定義 resolver 需要實現(xiàn) resolver.Builder 接口:

        type Builder interface {
        Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
        Scheme() string
        }

        其中 Build 方法返回 Resolver,Resolver 定義如下:

        type Resolver interface {
        ResolveNow(ResolveNowOptions)
        Close()
        }

        在 zRPC 中定義了兩種 resolver,direct 和 discov,這里我們主要分析基于 etcd 做服務(wù)發(fā)現(xiàn)的 discov,自定義的 resolver 需要通過 gRPC 提供了 Register 方法進行注冊代碼如下:

        func RegisterResolver() {
        resolver.Register(&dirBuilder)
        resolver.Register(&disBuilder)
        }

        當我們啟動我們的 zRPC Server 的時候,調(diào)用 Start 方法,會像 etcd 中注冊對應(yīng)的服務(wù)地址:

        func (ags keepAliveServer) Start(fn RegisterFn) error {
        // 注冊服務(wù)地址
        if err := ags.registerEtcd(); err != nil {
        return err
        }
        // 啟動服務(wù)
        return ags.Server.Start(fn)
        }

        當我們啟動 zRPC 客戶端的時候,在 gRPC 內(nèi)部會調(diào)用我們自定義 resolver 的 Build 方法,zRPC 通過在 Build 方法內(nèi)調(diào)用執(zhí)行了 resolver.ClientConn 的 UpdateState 方法,該方法會把服務(wù)地址注冊到 gRPC 客戶端內(nèi)部:

        func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
        resolver.Resolver, error) {
        hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
        return r == EndpointSepChar
        })
        // 服務(wù)發(fā)現(xiàn)
        sub, err := discov.NewSubscriber(hosts, target.Endpoint)
        if err != nil {
        return nil, err
        }

        update := func() {
        var addrs []resolver.Address
        for _, val := range subset(sub.Values(), subsetSize) {
        addrs = append(addrs, resolver.Address{
        Addr: val,
        })
        }
        // 向gRPC注冊服務(wù)地址
        cc.UpdateState(resolver.State{
        Addresses: addrs,
        })
        }
        // 監(jiān)聽
        sub.AddListener(update)
        update()
        // 返回自定義的resolver.Resolver
        return &nopResolver{cc: cc}, nil
        }

        在 discov 中,通過調(diào)用 load 方法從 etcd 中獲取指定服務(wù)的所有地址:

        func (c *cluster) load(cli EtcdClient, key string) {
        var resp *clientv3.GetResponse
        for {
        var err error
        ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
        // 從etcd中獲取指定服務(wù)的所有地址
        resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
        cancel()
        if err == nil {
        break
        }

        logx.Error(err)
        time.Sleep(coolDownInterval)
        }

        var kvs []KV
        c.lock.Lock()
        for _, ev := range resp.Kvs {
        kvs = append(kvs, KV{
        Key: string(ev.Key),
        Val: string(ev.Value),
        })
        }
        c.lock.Unlock()

        c.handleChanges(key, kvs)
        }

        并通過 watch 監(jiān)聽服務(wù)地址的變化:

        func (c *cluster) watch(cli EtcdClient, key string) {
        rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
        for {
        select {
        case wresp, ok := <-rch:
        if !ok {
        logx.Error("etcd monitor chan has been closed")
        return
        }
        if wresp.Canceled {
        logx.Error("etcd monitor chan has been canceled")
        return
        }
        if wresp.Err() != nil {
        logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
        return
        }
        // 監(jiān)聽變化通知更新
        c.handleWatchEvents(key, wresp.Events)
        case <-c.done:
        return
        }
        }
        }

        這部分主要介紹了 zRPC 中是如何自定義的 resolver,以及基于 etcd 的服務(wù)發(fā)現(xiàn)原理,通過這部分的介紹大家可以了解到 zRPC 內(nèi)部服務(wù)注冊發(fā)現(xiàn)的原理,源代碼比較多只是粗略的從整個流程上進行了分析,如果大家對 zRPC 的源碼比較感興趣可以自行進行學(xué)習(xí)

        balancer 模塊

        負載均衡原理圖:



        避免過載是負載均衡策略的一個重要指標,好的負載均衡算法能很好的平衡服務(wù)端資源。常用的負載均衡算法有輪訓(xùn)、隨機、Hash、加權(quán)輪訓(xùn)等。但為了應(yīng)對各種復(fù)雜的場景,簡單的負載均衡算法往往表現(xiàn)的不夠好,比如輪訓(xùn)算法當服務(wù)響應(yīng)時間變長就很容易導(dǎo)致負載不再平衡, 因此 zRPC 中自定義了默認負載均衡算法 P2C(Power of Two Choices),和 resolver 類似,要想自定義 balancer 也需要實現(xiàn) gRPC 定義的 balancer.Builder 接口,由于和 resolver 類似這里不再帶大家一起分析如何自定義 balancer,感興趣的朋友可以查看 gRPC 相關(guān)的文檔來進行學(xué)習(xí)

        注意,zRPC 是在客戶端進行負載均衡,常見的還有通過 nginx 中間代理的方式

        zRPC 框架中默認的負載均衡算法為 P2C,該算法的主要思想是:

        1. 從可用節(jié)點列表中做兩次隨機選擇操作,得到節(jié)點 A、B

        2. 比較 A、B 兩個節(jié)點,選出負載最低的節(jié)點作為被選中的節(jié)點

        偽代碼如下:



        主要算法邏輯在 Pick 方法中實現(xiàn):

        func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
        conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
        p.lock.Lock()
        defer p.lock.Unlock()

        var chosen *subConn
        switch len(p.conns) {
        case 0:
        return nil, nil, balancer.ErrNoSubConnAvailable
        case 1:
        chosen = p.choose(p.conns[0], nil)
        case 2:
        chosen = p.choose(p.conns[0], p.conns[1])
        default:
        var node1, node2 *subConn
        for i := 0; i < pickTimes; i++ {
        // 隨機數(shù)
        a := p.r.Intn(len(p.conns))
        b := p.r.Intn(len(p.conns) - 1)
        if b >= a {
        b++
        }
        // 隨機獲取所有節(jié)點中的兩個節(jié)點
        node1 = p.conns[a]
        node2 = p.conns[b]
        // 效驗節(jié)點是否健康
        if node1.healthy() && node2.healthy() {
        break
        }
        }
        // 選擇其中一個節(jié)點
        chosen = p.choose(node1, node2)
        }

        atomic.AddInt64(&chosen.inflight, 1)
        atomic.AddInt64(&chosen.requests, 1)
        return chosen.conn, p.buildDoneFunc(chosen), nil
        }

        choose 方法對隨機選擇出來的節(jié)點進行負載比較從而最終確定選擇哪個節(jié)點

        func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
        start := int64(timex.Now())
        if c2 == nil {
        atomic.StoreInt64(&c1.pick, start)
        return c1
        }

        if c1.load() > c2.load() {
        c1, c2 = c2, c1
        }

        pick := atomic.LoadInt64(&c2.pick)
        if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
        return c2
        } else {
        atomic.StoreInt64(&c1.pick, start)
        return c1
        }
        }

        上面主要介紹了 zRPC 默認負載均衡算法的設(shè)計思想和代碼實現(xiàn),那自定義的 balancer 是如何注冊到 gRPC 的呢,resolver 提供了 Register 方法來進行注冊,同樣 balancer 也提供了 Register 方法來進行注冊:

        func init() {
        balancer.Register(newBuilder())
        }

        func newBuilder() balancer.Builder {
        return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
        }

        注冊 balancer 之后 gRPC 怎么知道使用哪個 balancer 呢?這里我們需要使用配置項進行配置,在 NewClient 的時候通過 grpc.WithBalancerName 方法進行配置:

        func NewClient(target string, opts ...ClientOption) (*client, error) {
        var cli client
        opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
        if err := cli.dial(target, opts...); err != nil {
        return nil, err
        }

        return &cli, nil
        }

        這部分主要介紹了 zRPC 中內(nèi)中的負載均衡算法的實現(xiàn)原理以及具體的實現(xiàn)方式,之后介紹了 zRPC 是如何注冊自定義的 balancer 以及如何選擇自定義的 balancer,通過這部分大家應(yīng)該對負載均衡有了更進一步的認識

        總結(jié)

        首先,介紹了 zRPC 的基本使用方法,可以看到 zRPC 使用非常簡單,只需要少數(shù)幾行代碼就可以構(gòu)建高性能和自帶服務(wù)治理能力的 RPC 服務(wù),當然這里沒有面面俱到的介紹 zRPC 的基本使用,大家可以查看相關(guān)文檔進行學(xué)習(xí)

        接著,介紹了 zRPC 的幾個重要組成模塊以及其實現(xiàn)原理,并分析了部分源碼。攔截器模塊是整個 zRPC 的重點,其中內(nèi)置了豐富的功能,像熔斷、監(jiān)控、降載等等也是構(gòu)建高可用微服務(wù)必不可少的。resolver 和 balancer 模塊自定義了 gRPC 的 resolver 和 balancer,通過該部分可以了解到整個服務(wù)注冊與發(fā)現(xiàn)的原理以及如何構(gòu)建自己的服務(wù)發(fā)現(xiàn)系統(tǒng),同時自定義負載均衡算法也變得不再神秘

        最后,zRPC 是一個經(jīng)歷過各種工程實踐的 RPC 框架,不論是想要用于生產(chǎn)還是學(xué)習(xí)其中的設(shè)計模式都是一個不可多得的開源項目。希望通過這篇文章的介紹大家能夠進一步了解 zRPC

        項目地址

        https://github.com/tal-tech/go-zero

        框架地址

        https://github.com/tal-tech/go-zero/tree/master/zrpc

        文檔地址

        https://www.yuque.com/tal-tech/go-zero/rhakzy

        微信交流群


        瀏覽 58
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国产三级视频网站 | 黄网在线观看免费 | 美女无遮挡撒尿 | 免费男阳茎伸入女阳道视频 | 酒吧操逼 | 亚洲免费人成在线观看网站 | 影音先锋 一区二区三区 | 夸克看成人片一级A片 | 91成人电影 | 在线观看免费无码视频 |