1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Nacos5# Distro協(xié)議尋址模式

        共 16172字,需瀏覽 33分鐘

         ·

        2021-06-23 12:12


        引言

        在Nacos服務(wù)端分析服務(wù)注冊(cè)邏輯,就繞不開Distro協(xié)議。該協(xié)議為臨時(shí)一致性協(xié)議,數(shù)據(jù)存儲(chǔ)在緩存中。阿里專門為注冊(cè)中心而設(shè)計(jì)的。后面文章逐步還原該協(xié)議承擔(dān)的職責(zé),本文先分析尋址模式。

        一、內(nèi)容提要

        尋址概念

        • 尋址是指如何發(fā)現(xiàn)Nacos集群中節(jié)點(diǎn)變化的,當(dāng)檢測(cè)到變化時(shí)能后及時(shí)更新節(jié)點(diǎn)信息

        尋址模式

        • Nacos支持兩種尋址模式分別為「文件尋址」和「地址服務(wù)器尋址」
        • 默認(rèn)為文件尋址,可以通過參數(shù)「nacos.core.member.lookup.type」設(shè)置取值為「file」或者「address-server」
        • 文件尋址路徑默認(rèn)為 「${user.home}/nacos/conf/cluster.conf」
        • 文件尋址cluster.conf配置文件的內(nèi)容格式為「ip1:port,ip2:port」
        • 地址服務(wù)器尋址默認(rèn)為:http://jmenv.tbsite.net:8080/serverlist;其中域名、端口、url均可自定義
        • 檢測(cè)到集群節(jié)點(diǎn)變更時(shí)會(huì)更新緩存并發(fā)布MembersChangeEvent事件
        • 為防止新節(jié)點(diǎn)沒有初始化好,當(dāng)檢測(cè)到新節(jié)點(diǎn)加入時(shí)先設(shè)置該節(jié)點(diǎn)狀態(tài)為DOWN,該節(jié)點(diǎn)不參與通信
        • 過幾秒通過節(jié)點(diǎn)之間通信將已初始化的新節(jié)點(diǎn)狀態(tài)由DOWN設(shè)置為UP,該節(jié)點(diǎn)正式參與通信

        二、尋址初始化

        尋址是指如何發(fā)現(xiàn)Nacos集群中節(jié)點(diǎn)變化的,當(dāng)檢測(cè)到變化時(shí)能后及時(shí)更新節(jié)點(diǎn)信息。Nacos提供了兩種尋址模式,分別為 文件尋址地址服務(wù)器尋址。如果單機(jī)啟動(dòng)就本機(jī)一個(gè)節(jié)點(diǎn)也無所謂尋址。

        接下來看下源碼部分如何實(shí)現(xiàn)的。在DistroProtocol類中有一個(gè)成員變量ServerMemberManager memberManager,尋址的邏輯即封裝在ServerMemberManager中。

        坐標(biāo):ServerMemberManager#init()

        protected void init() throws NacosException {
          Loggers.CORE.info("Nacos-related cluster resource initialization");

          // 注解@1 
          this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
          // 注解@2
          this.localAddress = InetUtils.getSelfIP() + ":" + port;
          // 注解@3
          this.self = MemberUtil.singleParse(this.localAddress);
          // 注解@4
          this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);

          this.self.setAbilities(initMemberAbilities());
          // 注解@5
          serverList.put(self.getAddress(), self);
          // 注解@6
          registerClusterEvent();
          // 注解@7
          initAndStartLookup();

          if (serverList.isEmpty()) {
            throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
          }

          Loggers.CORE.info("The cluster resource is initialized");
        }

        注解@1 可以通過server.port指定服務(wù)端端口,默認(rèn)8848

        注解@2 獲取本地地址

        注解@3 拆分IP和Port組裝Member對(duì)象

        注解@4 設(shè)置版本取自pom文件 version=${project.version}

        注解@5 緩存本節(jié)點(diǎn)信息

        注解@6 發(fā)布MembersChangeEvent事件并訂閱IPChangeEvent事件

        private void registerClusterEvent() {
          // 發(fā)布MembersChangeEvent事件
          NotifyCenter.registerToPublisher(MembersChangeEvent.class,
                                           EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128))
        ;
          // 訂閱IPChangeEvent事件
          NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {
            @Override
            public void onEvent(InetUtils.IPChangeEvent event) {
              String newAddress = event.getNewIP() + ":" + port;
              ServerMemberManager.this.localAddress = newAddress;
              EnvUtil.setLocalAddress(localAddress);

              Member self = ServerMemberManager.this.self;
              self.setIp(event.getNewIP());

              String oldAddress = event.getOldIP() + ":" + port;
              ServerMemberManager.this.serverList.remove(oldAddress);
              ServerMemberManager.this.serverList.put(newAddress, self);

              ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
              ServerMemberManager.this.memberAddressInfos.add(newAddress);
            }

            @Override
            public Class<? extends Event> subscribeType() {
              return InetUtils.IPChangeEvent.class;
            }
          });

        }

        三、尋址適配器


        注解@7 初始化尋址模式適配器并啟動(dòng);尋址模式分別為單機(jī)、配置文件、地址服務(wù)

        private void initAndStartLookup() throws NacosException {
          // 注解@7.1
          this.lookup = LookupFactory.createLookUp(this);
          isUseAddressServer = this.lookup.useAddressServer();
          // 注解@7.2
          this.lookup.start();
        }

        注解@7.1 獲取尋址模式適配器

        public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
          if (!EnvUtil.getStandaloneMode()) {
            // 注解@7.1.1
            String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
            LookupType type = chooseLookup(lookupType);
            // 注解@7.1.2
            LOOK_UP = find(type);
            currentLookupType = type;
          } else {
            // 注解@7.1.3
            LOOK_UP = new StandaloneMemberLookup();
          }
          LOOK_UP.injectMemberManager(memberManager);
          Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
          return LOOK_UP;
        }

        注解@7.1.1 尋址類型可以通過「nacos.core.member.lookup.type」參數(shù)指定,取值為「file」或者「address-server」

        注解@7.1.2 根據(jù)不同的類型實(shí)例化不同的MemberLookup分別為:FileConfigMemberLookup和AddressServerMemberLookup

        private static MemberLookup find(LookupType type) {
            if (LookupType.FILE_CONFIG.equals(type)) {
                LOOK_UP = new FileConfigMemberLookup();
                return LOOK_UP;
            }
            if (LookupType.ADDRESS_SERVER.equals(type)) {
                LOOK_UP = new AddressServerMemberLookup();
                return LOOK_UP;
            }
            throw new IllegalArgumentException();
        }

        注解@7.1.3 如果采用standalone模式實(shí)例化StandaloneMemberLookup

        注解@7.2 尋址適配器啟動(dòng)

        standalone尋址適配器啟動(dòng)

        public void start() {
          if (start.compareAndSet(falsetrue)) {
            String url = InetUtils.getSelfIP() + ":" + EnvUtil.getPort();
            afterLookup(MemberUtil.readServerConf(Collections.singletonList(url)));
          }
        }

        備注: 坐標(biāo)StandaloneMemberLookup#start(),獲取本地地址執(zhí)行afterLookup

        文件尋址適配器啟動(dòng)

        public void start() throws NacosException {
                if (start.compareAndSet(falsetrue)) {
                    readClusterConfFromDisk();
                    try {
                        WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
                    } catch (Throwable e) {
                        Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
                    }
                }
         }

         private void readClusterConfFromDisk() {
           Collection<Member> tmpMembers = new ArrayList<>();
           try {
             List<String> tmp = EnvUtil.readClusterConf(); // 從磁盤文件讀取節(jié)點(diǎn)列表
             tmpMembers = MemberUtil.readServerConf(tmp);
           } catch (Throwable e) {
             Loggers.CLUSTER
               .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
           }
         
           afterLookup(tmpMembers);
         }

        備注: 默認(rèn)從 ${user.home}/nacos/conf/cluster.conf文件中讀取集群地址信息,文件格式為:「ip1:port,ip2:port」。讀取后執(zhí)行afterLookup。并注冊(cè)FileWatcher監(jiān)聽cluster.conf的變化,有變更會(huì)被監(jiān)聽并更新緩存地址列表。

        地址服務(wù)器尋址適配器

         public void start() throws NacosException {
           if (start.compareAndSet(falsetrue)) {
             this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount""12"));
             initAddressSys();
             run();
           }
         }

        每5秒定時(shí)請(qǐng)求地址服務(wù)器

        private void run() throws NacosException {
            boolean success = false;
            Throwable ex = null;
            int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
            for (int i = 0; i < maxRetry; i++) {
                try {
                    syncFromAddressUrl();
                    success = true;
                    break;
                } catch (Throwable e) {
                    ex = e;
                    Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
                }
            }
            if (!success) {
                throw new NacosException(NacosException.SERVER_ERROR, ex);
            }
            
            GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
        }

        處理地址列表

        private void syncFromAddressUrl() throws Exception {
            RestResult<String> result = restTemplate
                    .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
            if (result.ok()) {
                isAddressServerHealth = true;
                Reader reader = new StringReader(result.getData());
                try {
                    afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
                } catch (Throwable e) {
                    Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
                            ExceptionUtil.getAllExceptionMsg(e));
                }
                addressServerFailCount = 0;
            } else {
                addressServerFailCount++;
                if (addressServerFailCount >= maxFailCount) {
                    isAddressServerHealth = false;
                }
                Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
            }
        }

        備注: 域名默認(rèn)為「jmenv.tbsite.net」可以通過參數(shù)「address.server.domain」指定服務(wù)器地址;端口默認(rèn)為「8080」可以通過參數(shù)「address.server.port」指定;url默認(rèn)為「/serverlist」可以通過參數(shù)指定「address.server.url」。

        默認(rèn)為:http://jmenv.tbsite.net:8080/serverlist;每5秒鐘定時(shí)向地址服務(wù)器請(qǐng)求獲取地址列表;獲取列表后執(zhí)行afterLookup。

        四、節(jié)點(diǎn)變更

        三種適配器尋址最后都調(diào)用到了afterLookup,接下來看下這塊邏輯。

        public void afterLookup(Collection<Member> members) {
            this.memberManager.memberChange(members);
        }

        synchronized boolean memberChange(Collection<Member> members) {

          if (members == null || members.isEmpty()) {
            return false;
          }
          // 是否包含本地地址
          boolean isContainSelfIp = members.stream()
            .anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));

          if (isContainSelfIp) {
            isInIpList = true;
          } else {
            isInIpList = false;
            members.add(this.self);
            Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);
          }

          // 集群中地址列表是否有變化
          boolean hasChange = members.size() != serverList.size();
          ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
          Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
          for (Member member : members) {
            final String address = member.getAddress();
            Member existMember = serverList.get(address);
            if (existMember == null) { // 有新的節(jié)點(diǎn)加入
              hasChange = true;
             // 新增的節(jié)點(diǎn)先設(shè)置狀態(tài)為DOWN,過幾秒中通過心跳更改狀態(tài)UP。防止新節(jié)點(diǎn)未成功啟動(dòng)而發(fā)請(qǐng)求
             member.setState(NodeState.DOWN); 
              tmpMap.put(address, member);
            } else {
             // 已存在,還會(huì)被更新
              tmpMap.put(address, existMember); 
            }

            if (NodeState.UP.equals(member.getState())) {
              tmpAddressInfo.add(address);
            }
          }

          serverList = tmpMap;
          memberAddressInfos = tmpAddressInfo;

          Collection<Member> finalMembers = allMembers();

          Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);

          if (hasChange) { // 集群節(jié)點(diǎn)有變更
            MemberUtil.syncToFile(finalMembers); // 同步寫入磁盤文件cluster.conf中
            Event event = MembersChangeEvent.builder().members(finalMembers).build();
            NotifyCenter.publishEvent(event); // 發(fā)布MembersChangeEvent事件
          }

          return hasChange;
        }

        備注: 通過尋址適配器獲取的集群節(jié)點(diǎn)列表,會(huì)與緩存的節(jié)點(diǎn)信息進(jìn)行比較。如果有變更會(huì)更新緩存、把全部節(jié)點(diǎn)寫入磁盤文件cluster.conf、同時(shí)發(fā)布MembersChangeEvent事件。

        小結(jié): Nacos集群中的節(jié)點(diǎn)變更了怎么發(fā)現(xiàn)呢?Nacos提供兩種模式一個(gè)是通過動(dòng)態(tài)監(jiān)聽配置文件cluster.conf;另外一種是通過定時(shí)5秒去地址中心獲取。

        瀏覽 37
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩美女黄片 | 99免费视频在线 | 亚洲黄色天堂 | 男生和女生搞鸡视频 | 亚洲一区二区网站 | 中文字幕國產亂伦 | 国产精品国产自产拍几百部网站 | 91在线看黄 | 刘亦菲一区二区三区免费看 | 国产片婬乱一级毛片视频明星 |