1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Nacos6# Distro協(xié)議全量同步與校驗

        共 30747字,需瀏覽 62分鐘

         ·

        2021-06-28 08:20

        引言

        本文接著擼Distro協(xié)議,上文中分析了尋址模式。有了地址就要建立連接,有了連接就能通信了。集群之間都交互啥數(shù)據(jù)?本文就扒一扒全量同步和節(jié)點之間數(shù)據(jù)校驗。

        一、內(nèi)容提要

        節(jié)點間建立RCP連接

        • 訂閱了MembersChangeEvent事件,集群節(jié)點有變更能夠收到回調(diào)通知
        • 與集群中其他節(jié)點建立grpc連接并緩存到Map其中key格式為「Cluster-IP:Port」

        節(jié)點間校驗數(shù)據(jù)通信

        • 節(jié)點之間發(fā)送校驗數(shù)據(jù)是在全量同步后進行的
        • 發(fā)送校驗的頻率默認為5秒鐘一次
        • 校驗數(shù)據(jù)包括clientId和version,其中version為保留字段當前為0
        • 接受到校驗數(shù)據(jù)后如果緩存中存在該client表示校驗成功,同時更新保鮮時間,否則校驗失敗

        全量數(shù)據(jù)同步

        • 在節(jié)點啟動時會從集群中其他節(jié)點中的一個節(jié)點同步快照數(shù)據(jù)并緩存在Map中
        • 緩存的數(shù)據(jù)類型分類兩類分別為HTTP和gRPC
        • 具體數(shù)據(jù)即客戶端注冊節(jié)點信息含命名空間、分組名稱、服務(wù)名稱、節(jié)點Instance信息等
        • 集群中每個節(jié)點都擁有所有的快照數(shù)據(jù)

        二、節(jié)點間建立RPC連接

        節(jié)點之間要通信,需要建立連接。Nacos集群節(jié)點之間也不例外,下面看下Nacos是如何和集群之間建立連接的,以gRPC為例。

        Nacos中ClusterRpcClientProxy封裝了集群中節(jié)點之間的通道。

        @PostConstruct
        public void init() {
          try {
            // 注解@1
            NotifyCenter.registerSubscriber(this);
            // 注解@2
            List<Member> members = serverMemberManager.allMembersWithoutSelf(); 
            // 注解@3
            refresh(members);
            Loggers.CLUSTER
              .warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ",
                    members);
          } catch (NacosException e) {
            Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());
          }
        }

        注解@1 注冊自己訂閱MembersChangeEvent事件

        注解@2 獲取集群中的節(jié)點列表剔除自身節(jié)點

        注解@3 與各個節(jié)點建立rpc通道

        private void refresh(List<Member> members) throws NacosException {
           for (Member member : members) {

                if (MemberUtil.isSupportedLongCon(member)) {
                    // 注解@3.1
                    createRpcClientAndStart(member, ConnectionType.GRPC);
                }
            }
           Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();
            Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();
            List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a))
                    .map(a -> memberClientKey(a)).collect(Collectors.toList());
            // 注解@3.2
            while (iterator.hasNext()) {
                Map.Entry<String, RpcClient> next1 = iterator.next();
                if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {
                    Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());
                    RpcClientFactory.getClient(next1.getKey()).shutdown();
                    iterator.remove();
                }
            }

        }

        注解@3.1 為集群中每個節(jié)點member創(chuàng)建rcp client

        注解@3.2 關(guān)閉舊的grpc連接

        private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {
            Map<String, String> labels = new HashMap<String, String>(2);
            labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);
            // 注解@3.1.1
            String memberClientKey = memberClientKey(member);
            RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
            if (!client.getConnectionType().equals(type)) {
                Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member);
                RpcClientFactory.destroyClient(memberClientKey);
                // 注解@3.1.2
                client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);
            }

            if (client.isWaitInitiated()) {
                Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member);

                // 注解@3.1.3
                client.serverListFactory(new ServerListFactory() {
                    @Override
                    public String genNextServer() {
                        return member.getAddress(); // 返回連接集群其他節(jié)點地址
                    }

                    @Override
                    public String getCurrentServer() {
                        return member.getAddress();
                    }

                    @Override
                    public List<String> getServerList() {
                        return Lists.newArrayList(member.getAddress());
                    }
                });
                // 注解@3.1.4
                client.start(); 
            }
        }

        注解@3.1.1 memberClientKey由「Cluster-IP:Port」構(gòu)成,例如:Cluster-1.2.3.4:2008

        注解@3.1.2 創(chuàng)建grpc client并緩存在 clientMap,key為memberClientKey 此時client的狀態(tài)為WAIT_INIT

        注解@3.1.3 集群中固定的某一臺節(jié)點

        注解@3.1.4  grpc連接集群中的member節(jié)點設(shè)置client的狀態(tài)RUNNING

        小結(jié): 在與Nacos集群其他節(jié)點建立連接的過程中做了兩件事情:@1.訂閱了MembersChangeEvent事件 @2.與集群中其他節(jié)點建立grpc連接并緩存到Map其中key格式為「Cluster-IP:Port」。

        三、節(jié)點間校驗數(shù)據(jù)通信

        節(jié)點之間建立rpc通道必然是為了互相之間能通信,其中一個通信是節(jié)點之間發(fā)送校驗數(shù)據(jù)。那為什么要發(fā)這些校驗數(shù)據(jù)?這些數(shù)據(jù)都是些什么內(nèi)容?下面咱就去扒一扒。

        在DistroProtocol的構(gòu)造函數(shù)中的最后一個行有一個startDistroTask(),主要分析startVerifyTask的邏輯。

        public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
                DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig)
         
        {
            this.memberManager = memberManager;
            this.distroComponentHolder = distroComponentHolder;
            this.distroTaskEngineHolder = distroTaskEngineHolder;
            this.distroConfig = distroConfig;
            startDistroTask();
        }
        private void startDistroTask() {
            // 單機模式直接返回
            if (EnvUtil.getStandaloneMode()) {
                isInitialized = true;
                return;
            }
            startVerifyTask();
            startLoadTask();
        }
        private void startVerifyTask() {
           // 注解@4
            GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,
                    distroTaskEngineHolder.getExecuteWorkersManager()), distroConfig.getVerifyIntervalMillis());
        }

        注解@4  每隔5秒執(zhí)行,也就是節(jié)點之間發(fā)送校驗時間的默認頻率是5秒。

        可以通過配置參數(shù)「nacos.core.protocol.distro.data.verify_interval_ms」自定義。

        接著看DistroVerifyTimedTask的run方法。

        @Override
        public void run() {
            try {
                // 注解@5
                List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", targetServer);
                }

                // 注解@6
                for (String each : distroComponentHolder.getDataStorageTypes()) {
                    verifyForDataStorage(each, targetServer);
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
            }
        }

        注解@5 拿到集群中其他節(jié)點

        注解@6 在Nacos server啟動時初始化時兩種類型HTTP和gRPC,本文以gRPC為例進行分析。

        private void verifyForDataStorage(String type, List<Member> targetServer) {
            // 注解@7
            DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);
            // 注解@8
            if (!dataStorage.isFinishInitial()) {  // 未完成全量數(shù)據(jù)同步退出
                Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",
                        dataStorage.getClass().getSimpleName());
                return;
            }

            //注解@9
            List<DistroData> verifyData = dataStorage.getVerifyData();
            if (null == verifyData || verifyData.isEmpty()) {
                return;
            }

            for (Member member : targetServer) {
                DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);
                if (null == agent) {
                    continue;
                }
               // 注解@10
                executeTaskExecuteEngine.addTask(member.getAddress() + type,
                        new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));
            }
        }

        注解@7 Nacos啟動時緩存在dataStorageMap中兩種類型處理器分別用于處理gRPC和HTTP通信方式。

        「Nacos:Naming:v2:ClientData->DistroClientDataProcessor」和 「com.alibaba.nacos.naming.iplist.->DistroDataStorageImpl」

        注解@8 當從其他節(jié)點同步了全部數(shù)據(jù)后,則完成了初始化finished initial,全量數(shù)據(jù)同步下小節(jié)分析。

        注解@9  獲取校驗的數(shù)據(jù),數(shù)據(jù)為由本節(jié)點負責的clientId列表。

        @Override
        public List<DistroData> getVerifyData() {
            List<DistroData> result = new LinkedList<>(); // 一組DistroData
            for (String each : clientManager.allClientId()) {
                Client client = clientManager.getClient(each);
                if (null == client || !client.isEphemeral()) { // 無效client或者非臨時節(jié)點
                    continue;
                }
                // 注解@9.1
                if (clientManager.isResponsibleClient(client)) {
                    // 注解@9.2
                    DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);
                    DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                    DistroData data = new DistroData(distroKey,
                            ApplicationUtils.getBean(Serializer.class).serialize(verifyData))// 序列化校驗數(shù)據(jù)
                    data.setType(DataOperation.VERIFY);
                    result.add(data);
                }
            }
            return result;
        }

        注解@9.1 判斷client是否為本幾點負責的邏輯為ClientManagerDelegate#isResponsibleClient。即:屬于ConnectionBasedClient并且

        isNative為true表示該client是直連到該節(jié)點的。

        @Override
        public boolean isResponsibleClient(Client client) {
            return (client instanceof ConnectionBasedClient) && ((ConnectionBasedClient) client).isNative();
        }

        注解@9.2 構(gòu)造Verify Data 主要信息為clientId,還有一個版本信息作為保留字段,目前都是0。

        注解@10 向集群其他節(jié)點發(fā)送校驗數(shù)據(jù)DistroVerifyExecuteTask#run

        @Override
        public void run() {
            for (DistroData each : verifyData) {
                try {
                    if (transportAgent.supportCallbackTransport()) { // grpc支持回調(diào)
                        doSyncVerifyDataWithCallback(each);
                    } else { // http不支持回調(diào)使用同步
                        doSyncVerifyData(each);
                    }
                } catch (Exception e) {
                  //...
                }
            }
        }
        @Override
        public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
            if (isNoExistTarget(targetServer)) {
                callback.onSuccess();
            }
            DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
            Member member = memberManager.find(targetServer);
            try {
                DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                        verifyData.getDistroKey().getResourceKey(), callback, member);
                // 注解@11         
                clusterRpcClientProxy.asyncRequest(member, request, wrapper); 
            } catch (NacosException nacosException) {
                callback.onFailed(nacosException);
            }
        }

        注解@11 向其他節(jié)點發(fā)送本節(jié)點負責的clientId信息

        那集群其他節(jié)點接收到校驗數(shù)據(jù)做什么處理呢?

        翻到DistroDataRequestHandler#handle,此處包含了處理校驗數(shù)據(jù)的邏輯。

        @Override
        public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
            try {
                switch (request.getDataOperation()) {
                    case VERIFY:
                        return handleVerify(request.getDistroData(), meta);
                    case SNAPSHOT:
                        return handleSnapshot();
                    case ADD:
                    case CHANGE:
                    case DELETE:
                        return handleSyncData(request.getDistroData());
                    case QUERY:
                        return handleQueryData(request.getDistroData());
                    default:
                        return new DistroDataResponse();
                }
            } catch (Exception e) {
                // ...
            }
        }
        private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {
            DistroDataResponse result = new DistroDataResponse();
           // 注解@12
            if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {
                result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");
            }
            return result;
        }

        注解@12 數(shù)據(jù)校驗,下面可以看到,如果緩存存在client則校驗成功,刷新client保鮮時間,否則校驗失敗。

        @Override
        public boolean verifyClient(String clientId) {
            ConnectionBasedClient client = clients.get(clientId);
            if (null != client) {
                client.setLastRenewTime();
                return true;
            }
            return false;
        }

        小結(jié): 節(jié)點之間發(fā)送校驗數(shù)據(jù)是在全量同步后進行的;發(fā)送校驗的頻率默認為5秒鐘一次;校驗數(shù)據(jù)包括clientId和version,其中version為保留字段當前為0;接受到校驗數(shù)據(jù)后如果緩存中存在該client表示校驗成功,同時更新保鮮時間,否則校驗失敗

        四、全量數(shù)據(jù)同步

        上文中提到在發(fā)送校驗數(shù)據(jù)之前需要先完成全量數(shù)據(jù)同步,先翻回DistroProtocol#startDistroTask()方法的startLoadTask()部分。

        private void startLoadTask() {
            DistroCallback loadCallback = new DistroCallback() {
                @Override
                public void onSuccess() {
                    isInitialized = true;
                }
                @Override
                public void onFailed(Throwable throwable) {
                    isInitialized = false;
                }
            };
            GlobalExecutor.submitLoadDataTask(
                    new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
        }

        DistroLoadDataTask#run

        @Override
        public void run() {
          try {
            load(); // 注解@13
            if (!checkCompleted()) { // 注解@14
              GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
            } else {
              loadCallback.onSuccess();
              Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
            }
          } catch (Exception e) {
            loadCallback.onFailed(e);
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
          }
        }

        注解@13 從集群中其他節(jié)點全量加載數(shù)據(jù)

        注解@14 如果沒有加載成功延遲30秒鐘重新執(zhí)行一次,可以通過參數(shù)「nacos.core.protocol.distro.data.load_retry_delay_ms」指定

        private void load() throws Exception {
            while (memberManager.allMembersWithoutSelf().isEmpty()) {
                Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
                TimeUnit.SECONDS.sleep(1);
            }
            while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
                Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
                TimeUnit.SECONDS.sleep(1);
            }
            for (String each : distroComponentHolder.getDataStorageTypes()) { // 注解@15
                if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
                    loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); // 加載快照
                }
            }
        }

        注解@15 為不同的數(shù)據(jù)類型緩存快照,此處有g(shù)RPC和http兩類數(shù)據(jù)類型。即:Nacos:Naming:v2:ClientData和com.alibaba.nacos.naming.iplist.

        private boolean loadAllDataSnapshotFromRemote(String resourceType) {
            DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
            DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
            if (null == transportAgent || null == dataProcessor) {
                return false;
            }
            for (Member each : memberManager.allMembersWithoutSelf()) { // 注解@16
                try {
                   DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
                    boolean result = dataProcessor.processSnapshot(distroData);
                    if (result) {
                        distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 設(shè)置為完成初始化
                        return true;
                    }
                } catch (Exception e) {
                   
                }
            }
            return false;
        }

        注解@16 獲取集群中除了本節(jié)點的其他節(jié)點,循環(huán)重試獲取快照,直到有成功節(jié)點返回快照,成功后設(shè)置狀態(tài)狀態(tài)完成初始化「finishInitial」。

        @Override
        public DistroData getDatumSnapshot(String targetServer) {
            Member member = memberManager.find(targetServer);
            if (checkTargetServerStatusUnhealthy(member)) {
                throw new DistroException(
                        String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
            }
            DistroDataRequest request = new DistroDataRequest();
           // 設(shè)置請求操作為SNAPSHOT
            request.setDataOperation(DataOperation.SNAPSHOT); 
            try {
               // 發(fā)起請求快照數(shù)據(jù)
                Response response = clusterRpcClientProxy.sendRequest(member, request);
                if (checkResponse(response)) {
                    return ((DistroDataResponse) response).getDistroData();
                } else {
                    throw new DistroException(
                            String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                                    targetServer, response.getErrorCode(), response.getMessage()));
                }
            } catch (NacosException e) {
                throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
            }
        }

        接下來看看其他節(jié)點收到快照請求如何響應(yīng)的

        還是翻到DistroDataRequestHandler#handle,具體由handleSnapshot()方法來處理。

        private DistroDataResponse handleSnapshot() {
            DistroDataResponse result = new DistroDataResponse();
            DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);
            result.setDistroData(distroData);
            return result;
        }
        @Override
        public DistroData getDatumSnapshot() {
            List<ClientSyncData> datum = new LinkedList<>();
            // 把本節(jié)點的所有client數(shù)據(jù)全部封裝
            for (String each : clientManager.allClientId()) {
                Client client = clientManager.getClient(each);
                if (null == client || !client.isEphemeral()) {
                    continue;
                }
                datum.add(client.generateSyncData());
            }
            ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();
            snapshot.setClientSyncDataList(datum);
            byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot)// 序列化數(shù)據(jù)
            return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
        }

        下面看下client數(shù)據(jù)信息,命名空間、分組名稱、服務(wù)名稱、節(jié)點Instance信息(IP、端口等等)。

        public ClientSyncData generateSyncData() {
            List<String> namespaces = new LinkedList<>();
            List<String> groupNames = new LinkedList<>();
            List<String> serviceNames = new LinkedList<>();
            List<InstancePublishInfo> instances = new LinkedList<>();
            for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {
                namespaces.add(entry.getKey().getNamespace());
                groupNames.add(entry.getKey().getGroup());
                serviceNames.add(entry.getKey().getName());
                instances.add(entry.getValue());
            }
            return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
        }

        小結(jié): 集群中每個節(jié)點都擁有所有的快照數(shù)據(jù);在節(jié)點啟動時會從集群中其他節(jié)點中的一個節(jié)點同步快照數(shù)據(jù)并緩存在Map中;緩存的數(shù)據(jù)類型分類兩類分別為HTTP和gRPC;具體數(shù)據(jù)即客戶端注冊節(jié)點信息含命名空間、分組名稱、服務(wù)名稱、節(jié)點Instance信息等。


        瀏覽 52
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩中文字幕在线视频 | 啊啊啊啊啊啊操 | 777精品 | 又大又粗又硬又黑又黄毛片 | 久操网址 | 欧美黄色一级A片 | avtt香蕉久久 | 日韩怡春院 | 四虎视频国产精品免费 | 久久久久久逼 |