1. ETCD源碼分析Client端啟動流程分析

        共 10926字,需瀏覽 22分鐘

         ·

        2021-09-17 21:21

        ETCD源碼基于v3.5,在分析之前,需要搭建好源碼分析的環(huán)境。首先,從GitHub的倉庫中克隆下ETCD的源碼,再利用docker搭建我們的ETCD測試集群,命令如下:

        REGISTRY=quay.io/coreos/etcd
        NAME_1=etcd-node-0
        NAME_2=etcd-node-1
        NAME_3=etcd-node-2
        # IP在不同機器上不同,請查看docker的子網(wǎng)網(wǎng)段
        HOST_1=172.20.0.2
        HOST_2=172.20.0.3
        HOST_3=172.20.0.4
        PORT_1=2379
        PORT_2=12379
        PORT_3=22379
        PORT_C_1=2380
        PORT_C_2=12380
        PORT_C_3=22380
        CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_C_1},${NAME_2}=http://${HOST_2}:${PORT_C_2},${NAME_3}=http://${HOST_3}:${PORT_C_3}
        # 需要保證目錄存在并可寫
        DATA_DIR=/var/folders/

        # 需要創(chuàng)建docker網(wǎng)絡(luò),用于模擬集群網(wǎng)絡(luò)分區(qū)的情況。
        docker network create etcd_cluster

        docker run \
        -p $PORT_1:$PORT_1 \
        -p $PORT_C_1:$PORT_C_1 \
        --volume "${DATA_DIR}${NAME_1}:/etcd-data" \
        --name ${NAME_1} \
        --network etcd_cluster \
        ${REGISTRY}:v3.5.0 \
        /usr/local/bin/etcd \
        --name ${NAME_1} \
        --data-dir /etcd-data \
        --listen-client-urls http://0.0.0.0:$PORT_1 \
        --advertise-client-urls http://$HOST_1:$PORT_1 \
        --listen-peer-urls http://0.0.0.0:$PORT_C_1 \
        --initial-advertise-peer-urls http://$HOST_1:$PORT_C_1 \
        --initial-cluster ${CLUSTER} \
        --initial-cluster-token tkn \
        --initial-cluster-state new \
        --log-level info \
        --logger zap \
        --log-outputs stderr

        docker run \
        -p $PORT_2:$PORT_2 \
        -p $PORT_C_2:$PORT_C_2 \
        --volume=${DATA_DIR}${NAME_2}:/etcd-data \
        --name ${NAME_2} \
        --network etcd_cluster \
        ${REGISTRY}:v3.5.0 \
        /usr/local/bin/etcd \
        --name ${NAME_2} \
        --data-dir /etcd-data \
        --listen-client-urls http://0.0.0.0:$PORT_2 \
        --advertise-client-urls http://$HOST_2:$PORT_2 \
        --listen-peer-urls http://0.0.0.0:$PORT_C_2 \
        --initial-advertise-peer-urls http://$HOST_2:$PORT_C_2 \
        --initial-cluster ${CLUSTER} \
        --initial-cluster-token tkn \
        --initial-cluster-state new \
        --log-level info \
        --logger zap \
        --log-outputs stderr

        docker run \
        -p $PORT_3:$PORT_3 \
        -p $PORT_C_3:$PORT_C_3 \
        --volume=${DATA_DIR}${NAME_3}:/etcd-data \
        --name ${NAME_3} \
        --network etcd_cluster \
        ${REGISTRY}:v3.5.0 \
        /usr/local/bin/etcd \
        --name ${NAME_3} \
        --data-dir /etcd-data \
        --listen-client-urls http://0.0.0.0:$PORT_3 \
        --advertise-client-urls http://$HOST_3:$PORT_3 \
        --listen-peer-urls http://0.0.0.0:$PORT_C_3 \
        --initial-advertise-peer-urls http://$HOST_3:$PORT_C_3 \
        --initial-cluster ${CLUSTER} \
        --initial-cluster-token tkn \
        --initial-cluster-state new \
        --log-level info \
        --logger zap \
        --log-outputs stderr

        復(fù)制代碼

        如上,我們創(chuàng)建了三個ETCD節(jié)點,組成了一個集群。接下來我們正式進入源碼分析流程。

        ETCD Client啟動流程分析

        我們先看一段啟動代碼樣例:

                cli, err := clientv3.New(clientv3.Config{
        Endpoints: exampleEndpoints(),
        DialTimeout: dialTimeout,
        })
        if err != nil {
        log.Fatal(err)
        }
        defer cli.Close()

        ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
        _, err = cli.Put(ctx, "sample_key", "sample_value")
        cancel()
        if err != nil {
        log.Fatal(err)
        }
        復(fù)制代碼

        一個最簡單的程序只需要提供集群的所有節(jié)點的ip和端口就能訪問,這里需要注意的是,一定要填寫ETCD集群的所有節(jié)點,這樣才能有故障轉(zhuǎn)移、負載均衡的特性?;蛘哌\行一個ETCD的代理節(jié)點(ETCD網(wǎng)關(guān))負責(zé)轉(zhuǎn)發(fā)請求,這樣只填寫代理節(jié)點ip即可,當(dāng)然性能上會有所損失。

        一、ETCD的Client啟動流程分析

        接下來我們看看Client是如何被創(chuàng)建出來的:


        func newClient(cfg *Config) (*Client, error) {
        // -----A-----
        ctx, cancel := context.WithCancel(baseCtx)
        client := &Client{
        conn: nil,
        cfg: *cfg,
        creds: creds,
        ctx: ctx,
        cancel: cancel,
        mu: new(sync.RWMutex),
        callOpts: defaultCallOpts,
        lgMu: new(sync.RWMutex),
        }
        // -----A-----

        // -----B-----
        client.resolver = resolver.New(cfg.Endpoints...)

        conn, err := client.dialWithBalancer()
        if err != nil {
        client.cancel()
        client.resolver.Close()
        return nil, err
        }
        client.conn = conn
        // -----B-----

        // -----C-----
        client.Cluster = NewCluster(client)
        client.KV = NewKV(client)
        client.Lease = NewLease(client)
        client.Watcher = NewWatcher(client)
        client.Auth = NewAuth(client)
        client.Maintenance = NewMaintenance(client)

        ...
        // -----C-----

        return client, nil
        }

        復(fù)制代碼
        A段代碼分析

        首先來看第A段代碼,其主要是初始化了一個client的實例,并把Config結(jié)構(gòu)體傳遞給它,那么Config中包含了什么配置項呢?

        type Config struct {
        // ETCD服務(wù)器地址,注意需要提供ETCD集群所有節(jié)點的ip
        Endpoints []string `json:"endpoints"`

        // 設(shè)置了此間隔時間,每 AutoSyncInterval 時間ETCD客戶端都會
        // 自動向ETCD服務(wù)端請求最新的ETCD集群的所有節(jié)點列表
        //
        // 默認為0,即不請求
        AutoSyncInterval time.Duration `json:"auto-sync-interval"`

        // 建立底層的GRPC連接的超時時間
        DialTimeout time.Duration `json:"dial-timeout"`

        // 這個配置和下面的 DialKeepAliveTimeoutt
        // 都是用來打開GRPC提供的 KeepAlive
        // 功能,作用主要是保持底層TCP連接的有效性,
        // 及時發(fā)現(xiàn)連接斷開的異常。
        //
        // 默認不打開 keepalive
        DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`

        // 客戶端發(fā)送 keepalive 的 ping 后,等待服務(wù)端的 ping ack 包的時長
        // 超過此時長會報 `translation is closed`
        DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`

        // 也是 keepalive 中的設(shè)置,
        // true則表示無論有沒有活躍的GRPC連接,都執(zhí)行ping
        // false的話,沒有活躍的連接也就不會發(fā)送ping。
        PermitWithoutStream bool `json:"permit-without-stream"`

        // 最大可發(fā)送字節(jié)數(shù),默認為2MB
        // 也就是說,我們ETCD的一條KV記錄最大不能超過2MB,
        // 如果要設(shè)置超過2MB的KV值,
        // 只修改這個配置也是無效的,因為ETCD服務(wù)端那邊的限制也是2MB。
        // 需要先修改ETCD服務(wù)端啟動參數(shù):`--max-request-bytes`,再修改此值。
        MaxCallSendMsgSize int

        // 最大可接收的字節(jié)數(shù),默認為`Int.MaxInt32`
        // 一般不需要改動
        MaxCallRecvMsgSize int

        // HTTPS證書配置
        TLS *tls.Config

        // 上下文,一般用于取消操作
        ctx.Context

        // 設(shè)置此值,會拒絕連接到低版本的ETCD
        // 什么是低版本呢?
        // 寫死了,小于v3.2的版本都是低版本。
        RejectOldCluster bool `json:"reject-old-cluster"`

        // GRPC 的連接配置,具體可參考GRPC文檔
        DialOptions []grpc.DialOption

        // zap包的Logger配置
        // ETCD用的日志包就是zap
        Logger *zap.Logger
        LogConfig *zap.Config

        ...
        }
        復(fù)制代碼

        還有一些常用配置項,比較簡單,這里就不再列出了。

        B段代碼分析

        本段是整個代碼的核心部分,主要做了兩件事:

        1. 創(chuàng)建了 resolver 用于解析ETCD服務(wù)的地址
          resolver(解析器)其實是grpc中的概念,比如:DNS解析器,域名轉(zhuǎn)化為真實的ip;服務(wù)注冊中心,也是一種把服務(wù)名轉(zhuǎn)化為真實ip的解析服務(wù)。

          具體的概念就不展開了,如果對grpc這方面比較感興趣,文末會推薦一個講的很好的grpc源碼分析博客。

          總之,etcd自己寫了一個解析器,就在resolver包里,這個解析器提供了以下幾個功能:

          1. 把Endpoints里的ETCD服務(wù)器地址傳給grpc框架,這里,因為ETCD自己實現(xiàn)的解析器不支持DNS解析,所以Endpoints只能是ip地址或者unix套接字。

          2. 告訴grpc,如果Endpoints有多個,負載均衡的策略是輪詢,這點很重要。

        2. dialWithBalancer() 建立了到ETCD的服務(wù)端鏈接

        func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
        creds := c.credentialsForEndpoint(c.Endpoints()[0])
        opts := append(dopts, grpc.WithResolvers(c.resolver))
        return c.dial(creds, opts...)
        }
        復(fù)制代碼

        這個用于建立到ETCD服務(wù)端的連接的方法名很有意思,雖然叫dialWithBalancer但內(nèi)部代碼很簡單,可以看到里面并無Balancer(負載均衡器)的出現(xiàn)。但其實因為上面說到,ETCD使用了自己的resolver,其內(nèi)部已經(jīng)寫好了負載均衡策略:round_robin。所以這里通過grpc.WithResolvers()把resolver傳進去,也是達到了負載均衡的效果。

        接下來進入dial(),這個方法雖然有些長,但整體邏輯是非常清晰的,省略無關(guān)代碼后,其內(nèi)部是做了以下幾件事:

        func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
        // 首先,ETCD通過這行代碼,向GRPC框架加入了一些自己的
        // 配置,比如:KeepAlive特性(配置里提到的配置項)、
        // TLS證書配置、還有最重要的重試策略。
        opts, err := c.dialSetupOpts(creds, dopts...)
        ...

        // context 的一段經(jīng)典樣例代碼
        // 問:如果我同時把非零的DialTimeout和
        // 帶超時的 context 傳給客戶端,
        // 到底以哪個超時為準(zhǔn)?
        // 答:這里新建了子context(dctx),父context和DialTimeout
        // 哪個先到deadline,就以哪個為準(zhǔn)。
        dctx := c.ctx
        if c.cfg.DialTimeout > 0 {
        var cancel context.CancelFunc
        // 同時包含父context和DialTimeout
        // 哪個先倒時間就以哪個為準(zhǔn)。
        dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
        defer cancel()
        }

        // 最終調(diào)用grpc.DialContext()建立連接
        conn, err := grpc.DialContext(dctx, target, opts...)
        ...
        return conn, nil
        }
        復(fù)制代碼
        C段代碼分析

        C段代碼無非就是做一些功能接口的初始化,比如:KV接口(用于提供Put、Get等)、Wathcer接口(用于監(jiān)聽Key)等,具體如何初始化到分析各接口再講。

        再回到啟動流程,初始化功能完畢后,就是getToken了,這個token是我們開啟了ETCD的賬號密碼功能后,通過賬號密碼獲取到了token,然后才能訪問ETCD提供的GRPC接口。

        然后是提供 RejectOldCluster 和 autoSync 功能,這個在介紹Config時也提過,這里就不再贅述了。

        ETCD Client重試策略分析

        對ETCD客戶端提供的自動重試策略的分析,是本文的重點。自動重試是ETCD能提供高可用特性的重要保證,在往下分析之前,一定要記住以下兩個概念:

        1. 自動重試不會在ETCD集群的同一節(jié)點上進行,這跟我們平常做的重試不同,因為前面說了ETCD是通過GRPC框架提供對集群訪問的負載均衡策略的,所以會輪詢的重試集群的每個節(jié)點。

        2. 自動重試只會重試一些特定的錯誤,比如:codes.Unavailable

        接下來,就讓我們來看看ETCD是如何利用GRPC提供的攔截器做自動重試的,學(xué)會這個,我們也能在自己的GRPC項目中用上同樣的套路:

            // 這段代碼在dialWithBalancer->dial->dialSetupOpts中
        rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))

        opts = append(opts,
        grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
        grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
        )

        復(fù)制代碼

        看以上的代碼,要自動重試只需兩步:

        1. 創(chuàng)建backoff函數(shù),也就是計算重試等待時間的函數(shù)。

        2. 通過WithXXXInterceptor(),注冊重試攔截器,這樣每次GRPC有請求都會回調(diào)該攔截器。

        這里,grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),我們看到Stream的重試攔截器,其最大重試次數(shù)設(shè)置為了0(withMax()),也就是不重試,這其實是故意為之,因為Client端的Stream重試不被支持。(Client端需要重試Stream,需要自己做單獨處理,不能通過攔截器。)

        那我們首先看看如何計算等待時間:

        // waitBetween 重試間隔時長
        // jitterFraction 隨機抖動率,
        // 比如:默認重試間隔為25ms,抖動率:0.1,
        // 那么實際重試間隔就在 25土2.5ms 之間。
        // attempt 實際重試了多少次
        func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration,
        jitterFraction float64) backoffFunc {

        return func(attempt uint) time.Duration {
        n := uint(len(c.Endpoints()))
        quorum := (n/2 + 1)
        if attempt%quorum == 0 {
        c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
        return jitterUp(waitBetween, jitterFraction)
        }
        c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
        return 0
        }
        }
        復(fù)制代碼

        可以看到roundRobinQuorumBackoff返回了一個閉包,內(nèi)部是重試間隔時長計算邏輯,這個邏輯說來也簡單:

            1. 若重試次數(shù)已經(jīng)達到集群的法定人數(shù)(quorum),則真正的計算間隔時長,
        間隔時長到期后,才進行重試。
        2. 否則,直接返回0,也就是馬上重試。
        復(fù)制代碼

        還記得剛才說的必須記住的兩個概念嗎?其中一點就是負載均衡策略寫死是輪詢,而這個重試邏輯一定要配合負載均衡是輪詢策略,達到的效果是:假如你訪問集群中的一臺節(jié)點失敗,可能是那臺節(jié)點出問題了,但如果整個集群是好的,這時候馬上重試,輪詢到下臺節(jié)點就行。

        但是,如果重試多次,集群大多數(shù)節(jié)點(法定人數(shù))都失敗了,那應(yīng)該是集群出問題了,這時候就需要計算間隔時間,等會兒再重試看看問題能不能解決。

        這里也可以看到ETCD的Client端,考慮的細節(jié)問題是非常多的,一個簡單的重試時間計算,也能進行邏輯上的小小優(yōu)化。

        那么重試攔截器又是如何實現(xiàn)的呢?接著看攔截器的相關(guān)代碼:

        func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
        ...
        // 如果最大重試次數(shù)設(shè)置為0,那就不重試。
        if callOpts.max == 0 {
        return invoker(ctx, method, req, reply, cc, grpcOpts...)
        }
        var lastErr error
        // 開始重試計數(shù)
        for attempt := uint(0); attempt < callOpts.max; attempt++ {
        // 計算重試間隔時間,并阻塞代碼,等待
        // 這里最終會調(diào)用到 roundRobinQuorumBackoff 來計算時間
        if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
        return err
        }

        // 再次重新執(zhí)行GRPC請求
        lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
        if lastErr == nil {
        // 重試成功,退出
        return nil
        }

        // 這段代碼分析了兩種情況
        // 1. 服務(wù)端返回了 Context Error(超時、被取消),直接重試
        // 2. 客戶端的 Context 也出現(xiàn)了Error
        if isContextError(lastErr) {
        if ctx.Err() != nil {
        // 客戶端本身的ctx也報錯了,不重試了,退出。
        return lastErr
        }
        // 服務(wù)端返回,直接重試
        continue
        }

        if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
        // 是AuthToken不正確,重新獲取Token
        gterr := c.getToken(ctx)
        ...
        continue
        }
        // 只有在特定錯誤才重試(code.Unavailable)
        // 否則返回Err,不重試。
        if !isSafeRetry(c.lg, lastErr, callOpts) {
        return lastErr
        }
        }
        return lastErr
        }
        }
        復(fù)制代碼

        代碼做了一定程度的精簡,但是主要流程都是保留的。

        由此,ETCD的整體重試流程也介紹完畢了。

        總結(jié)

        通過對ETCD整個啟動流程的代碼分析,我們可以總結(jié)出以下幾點:

        1. Endpoints 用來做負載均衡和重試策略計算法定人數(shù),一定要填寫集群的全部節(jié)點,
        或者打開AutoSync功能。

        2. ETCD 自己編寫了GRPC的resolver和balancer,可以借鑒到GRPC的相關(guān)項目中去。
        resolver只能解析ip和unix套接字,balancer策略寫死是輪詢策略。

        3. ETCD 重試流程只重試部分錯誤,所以不要完全指望ETCD的自動重試,一定要自己做好錯誤處理。

        復(fù)制代碼

        啟動流程圖,其中列出的函數(shù)就是整個啟動流程上的重要函數(shù):

        Config

        New

        newClient

        resolver.New

        dialWithBanlancer

        dial

        grpc.DialContext

        最后,本文涉及到一些GRPC的基礎(chǔ)知識,不了解的小伙伴可以去(blog.csdn.net/u011582922/… )這里看看,講的很詳細。


        作者:FengY_HYY
        鏈接:https://juejin.cn/post/7007992988672458765
        來源:掘金
        著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。



        瀏覽 68
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 免费三级搞逼毛片操逼黄色 | 男生j插入女生p | 婷婷综合无码 | 國產精品77777777777 | 无套内谢少妇高潮免费 |