1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Nacos7# Distro協(xié)議增量同步

        共 59874字,需瀏覽 120分鐘

         ·

        2021-07-08 18:44

        引言

        本文接著擼Distro協(xié)議,上文中分析了在Nacos server啟動(dòng)時(shí)會(huì)進(jìn)行全量數(shù)據(jù)同步和數(shù)據(jù)校驗(yàn),具體數(shù)據(jù)即客戶端注冊節(jié)點(diǎn)信息含命名空間、分組名稱、服務(wù)名稱、節(jié)點(diǎn)Instance信息等。什么時(shí)候會(huì)觸發(fā)增量同步?增量同步都干了些啥,下文接著擼擼增量數(shù)據(jù)同步。

        一、內(nèi)容提要

        增量數(shù)據(jù)同步

        • 在Nacos節(jié)點(diǎn)啟動(dòng)時(shí)通過事件驅(qū)動(dòng)模式訂閱了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件
        • 當(dāng)節(jié)點(diǎn)收到ClientChangedEvent事件時(shí),會(huì)向集群中其他節(jié)點(diǎn)發(fā)送更新Client信息請求,其他節(jié)點(diǎn)收到后更新緩存
        • 當(dāng)節(jié)點(diǎn)收到ClientVerifyFailedEvent事件時(shí),向該Event指定的目標(biāo)節(jié)點(diǎn)發(fā)起新增該Event指定的Client信息請求,目標(biāo)節(jié)點(diǎn)收到后更新到自己緩存中
        • 當(dāng)節(jié)點(diǎn)收到ClientDisconnectEvent事件時(shí),會(huì)向集群中其他節(jié)點(diǎn)發(fā)送刪除Client信息請求,其他節(jié)點(diǎn)收到后將該Client緩存刪除

        增量事件觸發(fā)

        • 當(dāng)有服務(wù)注冊或者注銷時(shí)會(huì)觸發(fā)ClientEvent.ClientChangedEvent事件,即客戶端調(diào)用naming.registerInstance或者naming.deregisterInstance
        • 定時(shí)任務(wù)每隔3秒鐘定時(shí)檢查緩存中的所有連接,如果超過保鮮期20秒則再次發(fā)起連接請求,連接未成功則注銷關(guān)閉該連接并發(fā)布ClientEvent.ClientDisconnectEvent事件
        • Nacos集群之間通過每5秒發(fā)送心跳校驗(yàn)數(shù)據(jù)請求(具體為本節(jié)點(diǎn)負(fù)責(zé)Client信息),其他節(jié)點(diǎn)接受到校驗(yàn)請求,如果緩存中存在該client表示校驗(yàn)成功,同時(shí)更新保鮮時(shí)間;否則校驗(yàn)失敗,回調(diào)返回失敗Response,請求節(jié)點(diǎn)收到失敗的Response后會(huì)發(fā)布ClientVerifyFailedEvent事件

        二、增量數(shù)據(jù)同步

        將代碼翻到DistroClientDataProcessor類中,該類繼承了SmartSubscriber,遵循Subscriber/Notify模式,即事件驅(qū)動(dòng)模式。該模式前面文章中分析過,當(dāng)有訂閱的事件時(shí)會(huì)進(jìn)行回調(diào)通知。

        訂閱的事件

        DistroClientDataProcessor訂閱了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。

        @Override
        public List<Class<? extends Event>> subscribeTypes() {
          List<Class<? extends Event>> result = new LinkedList<>();
          result.add(ClientEvent.ClientChangedEvent.class);
          result.add(ClientEvent.ClientDisconnectEvent.class);
          result.add(ClientEvent.ClientVerifyFailedEvent.class);
          return result;
        }

        當(dāng)有上述三個(gè)事件產(chǎn)生時(shí),DefaultPublisher回調(diào)onEvent方法。

        public void onEvent(Event event) {
            if (EnvUtil.getStandaloneMode()) {
                return;
            }
            if (!upgradeJudgement.isUseGrpcFeatures()) {
                return;
            }
            if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
               // 注解@1
                syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
            } else {
                // 注解@2
                syncToAllServer((ClientEvent) event);
            }
        }

        注解@1 將ClientVerifyFailedEvent同步給校驗(yàn)失敗的節(jié)點(diǎn),操作類型為ADD

        注解@2 將同步給集群中的其他節(jié)

        private void syncToAllServer(ClientEvent event) {
            Client client = event.getClient();
            // Only ephemeral data sync by Distro, persist client should sync by raft.
            if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
                return;
            }
            if (event instanceof ClientEvent.ClientDisconnectEvent) {
               // 注解@3
                DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                distroProtocol.sync(distroKey, DataOperation.DELETE);
            } else if (event instanceof ClientEvent.ClientChangedEvent) {
               // 注解@4
                DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
                distroProtocol.sync(distroKey, DataOperation.CHANGE);
            }
        }

        注解@3 當(dāng)客戶端斷開連接事件ClientDisconnectEvent時(shí),向其他節(jié)點(diǎn)同步DELETE操作

        注解@4 當(dāng)客戶端變更事件ClientChangedEvent時(shí),向其他節(jié)點(diǎn)同步CHANGE操作

        接著看下不同操作類型的處理

        @Override
        public boolean process(NacosTask task) {
            if (!(task instanceof DistroDelayTask)) {
                return true;
            }
            DistroDelayTask distroDelayTask = (DistroDelayTask) task;
            DistroKey distroKey = distroDelayTask.getDistroKey();
            switch (distroDelayTask.getAction()) {
                case DELETE: // 刪除操作
                    DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
                    distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
                    return true;
                case CHANGE:
                case ADD: // 更新和新增操作
                    DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
                    distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
                    return true;
                default:
                    return false;
            }
        }

        向指定的集群節(jié)點(diǎn)同步更新數(shù)據(jù)

        @Override
        public boolean syncData(DistroData data, String targetServer) {
            if (isNoExistTarget(targetServer)) {
                return true;
            }
            // 構(gòu)造請求數(shù)據(jù)并設(shè)置數(shù)據(jù)類型
            DistroDataRequest request = new DistroDataRequest(data, data.getType());
            // 查找目標(biāo)節(jié)點(diǎn)緩存數(shù)據(jù)
            Member member = memberManager.find(targetServer);
            // 節(jié)點(diǎn)狀態(tài)檢查需UP狀態(tài),即:可通信狀態(tài)
            if (checkTargetServerStatusUnhealthy(member)) {
                Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);
                return false;
            }
            try {
                // 向目標(biāo)節(jié)點(diǎn)發(fā)送數(shù)據(jù)
                Response response = clusterRpcClientProxy.sendRequest(member, request);
                return checkResponse(response);
            } catch (NacosException e) {
                Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);
            }
            return false;
        }

        異步更新操作

        @Override
        public void syncData(DistroData data, String targetServer, DistroCallback callback) {
            if (isNoExistTarget(targetServer)) {
                callback.onSuccess();
            }
            DistroDataRequest request = new DistroDataRequest(data, data.getType());
            Member member = memberManager.find(targetServer);
            try {
                // 異步更新操作
                clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member));
            } catch (NacosException nacosException) {
                callback.onFailed(nacosException);
            }
        }

        節(jié)點(diǎn)收到這些操作請求如何處理呢?

        代碼翻到DistroDataRequestHandler#handle(),集群中節(jié)點(diǎn)收到請求后處理邏輯在這里:

        @Override
        public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {
            try {
                switch (request.getDataOperation()) {
                    case VERIFY:
                        return handleVerify(request.getDistroData(), meta);
                    case SNAPSHOT:
                        return handleSnapshot();
                    case ADD:
                    case CHANGE:
                    case DELETE:
                        return handleSyncData(request.getDistroData());
                    case QUERY:
                        return handleQueryData(request.getDistroData());
                    default:
                        return new DistroDataResponse();
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);
                DistroDataResponse result = new DistroDataResponse();
                result.setErrorCode(ResponseCode.FAIL.getCode());
                result.setMessage("handle distro request with exception");
                return result;
            }
        }

        可以看出ADD、CHANGE和DELETE均由handleSyncData處理。

        private DistroDataResponse handleSyncData(DistroData distroData) {
            DistroDataResponse result = new DistroDataResponse();
            if (!distroProtocol.onReceive(distroData)) {
                result.setErrorCode(ResponseCode.FAIL.getCode());
                result.setMessage("[DISTRO-FAILED] distro data handle failed");
            }
            return result;
        }
        @Override
        public boolean processData(DistroData distroData) {
            switch (distroData.getType()) {
                case ADD:
                case CHANGE:
                    ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class)
                            .deserialize(distroData.getContent(), ClientSyncData.class)
        ;
                    handlerClientSyncData(clientSyncData); // 注解@5
                    return true;
                case DELETE:
                    String deleteClientId = distroData.getDistroKey().getResourceKey();
                    Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);
                    clientManager.clientDisconnected(deleteClientId); // 注解@6
                    return true;
                default:
                    return false;
            }
        }

        注解@5 將同步過來的Client信息進(jìn)行緩存

        private void handlerClientSyncData(ClientSyncData clientSyncData) {
            Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
            clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
            Client client = clientManager.getClient(clientSyncData.getClientId());
           // 注解@5.1
            upgradeClient(client, clientSyncData);
        }

        需要的是從其他節(jié)點(diǎn)通過過來的Client信息,ConnectionBasedClient屬性isNative為false表示該連接時(shí)從其他節(jié)點(diǎn)同步過來的;true表示該連接客戶端直接連接的。

        public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
            String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
            ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
            return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
        }

        @Override
        public ConnectionBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {
          return new ConnectionBasedClient(clientId, false); // false表示從其他節(jié)點(diǎn)同步過來的client
        }

        @Override
        public boolean clientConnected(Client client) {
          Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
          if (!clients.containsKey(client.getClientId())) {
            clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); // 緩存client
          }
          return true;
        }

        注解@5.1  更新Client的Service以及Instance信息。

        private void upgradeClient(Client client, ClientSyncData clientSyncData) {

            List<String> namespaces = clientSyncData.getNamespaces();
            List<String> groupNames = clientSyncData.getGroupNames();
            List<String> serviceNames = clientSyncData.getServiceNames();
            List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
            Set<Service> syncedService = new HashSet<>();
            for (int i = 0; i < namespaces.size(); i++) {
                Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
                Service singleton = ServiceManager.getInstance().getSingleton(service);
                syncedService.add(singleton);
                InstancePublishInfo instancePublishInfo = instances.get(i);
                if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
                    client.addServiceInstance(singleton, instancePublishInfo);
                    NotifyCenter.publishEvent(
                            new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
                }
            }
            for (Service each : client.getAllPublishedService()) {
                if (!syncedService.contains(each)) {
                    client.removeServiceInstance(each);
                    NotifyCenter.publishEvent(
                            new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
                }
            }
        }

        注解@6 響應(yīng)刪除操作,從clients緩存中移除。

        @Override
        public boolean clientDisconnected(String clientId) {
            Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
            ConnectionBasedClient client = clients.remove(clientId);
            if (null == client) {
                return true;
            }
            client.release();
            NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
            return true;
        }

        小結(jié): 增量同步的邏輯如下:當(dāng)本節(jié)點(diǎn)DistroClientDataProcessor收到ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件時(shí),會(huì)向Nacos集群的其他節(jié)點(diǎn)同步Client信息;集群中其他節(jié)點(diǎn)收到同步信息后更新或者刪除本地緩存的Client信息;通過增量同步的Client信息isNative為false表示不是由客戶端直連的。

        三、增量事件觸發(fā)

        在Nacos server啟動(dòng)時(shí)從運(yùn)行時(shí)內(nèi)存信息可以看出,總共緩存了17個(gè)事件類型。當(dāng)然也包括ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent。

        ClientChangedEvent事件觸發(fā)

        當(dāng)處理服務(wù)注冊和注銷事件時(shí)會(huì)觸發(fā)ClientChangeEvent事件,詳見InstanceRequestHandler#handle處理邏輯。

        public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
            Service service = Service
                    .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
            switch (request.getType()) {
                // 注解@7
                case NamingRemoteConstants.REGISTER_INSTANCE:
                    return registerInstance(service, request, meta);
                // 注解@8
                case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                    return deregisterInstance(service, request, meta);
                default:
                    throw new NacosException(NacosException.INVALID_PARAM,
                            String.format("Unsupported request type %s", request.getType()));
            }
        }

        注解@7 處理注冊請求,會(huì)調(diào)用到addServiceInstance方法,該方法中發(fā)布了ClientEvent.ClientChangedEvent事件。

        public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
            if (null == publishers.put(service, instancePublishInfo)) {
                MetricsMonitor.incrementInstanceCount();
            }
            NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
            Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
            return true;
        }

        注解@8 處理注銷請求,會(huì)調(diào)用到removeServiceInstance方法,該方法中發(fā)布了ClientEvent.ClientChangedEvent事件

        public InstancePublishInfo removeServiceInstance(Service service) {
                InstancePublishInfo result = publishers.remove(service);
                if (null != result) {
                    MetricsMonitor.decrementInstanceCount();
                    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
                }
                Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());
                return result;
        }

        小結(jié): 當(dāng)有服務(wù)注冊或者注銷時(shí)會(huì)觸發(fā)ClientEvent.ClientChangedEvent事件。

        ClientDisconnectEvent事件觸發(fā)

        下面一段代碼通過檢測連接是否超過保鮮期,超過保鮮期的會(huì)被注銷關(guān)閉,翻到代碼ConnectionManager#start()。

        @PostConstruct
        public void start() {
            // 定時(shí)任務(wù)每3秒執(zhí)行一次
            RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 獲取緩存連接
                        int totalCount = connections.size();
                        Loggers.REMOTE_DIGEST.info("Connection check task start");
                        MetricsMonitor.getLongConnectionMonitor().set(totalCount);
                        // 所有連接集合
                        Set<Map.Entry<String, Connection>> entries = connections.entrySet();
                        // 獲取通過SDK連接的數(shù)量
                        int currentSdkClientCount = currentSdkClientCount();
                        boolean isLoaderClient = loadClient >= 0;
                        int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;

                        int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);

                        List<String> expelClient = new LinkedList<>();

                        Map<String, AtomicInteger> expelForIp = new HashMap<>(16);

                        // 1. calculate expel count  of ip.
                        // 加載Connection ConnectionLimitRule
                        // 默認(rèn)路徑為 ${usr.home}/nacos/data/loader/limitRule
                        for (Map.Entry<String, Connection> entry : entries) {

                            Connection client = entry.getValue();
                            String appName = client.getMetaInfo().getAppName();
                            String clientIp = client.getMetaInfo().getClientIp();
                            if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {
                                //get limit for current ip.
                                // 默認(rèn)無limit限制
                                int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);
                                // 默認(rèn)無limit限制
                                if (countLimitOfIp < 0) {
                                    int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);
                                    countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;
                                }
                                if (countLimitOfIp < 0) { // 默認(rèn)無限制
                                    countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();
                                }

                                if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {
                                    AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);
                                    if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {
                                        expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));
                                    }
                                }
                            }
                        }

                        if (expelForIp.size() > 0) { // 默認(rèn)等于0
                            Loggers.REMOTE_DIGEST.info("Over limit ip expel info,", expelForIp);
                        }

                        Set<String> outDatedConnections = new HashSet<>();
                        long now = System.currentTimeMillis();
                        // 2.get expel connection for ip limit.
                        //
                        for (Map.Entry<String, Connection> entry : entries) {
                            Connection client = entry.getValue();
                            String clientIp = client.getMetaInfo().getClientIp();
                            AtomicInteger integer = expelForIp.get(clientIp);
                            if (integer != null && integer.intValue() > 0) {
                                integer.decrementAndGet();
                                expelClient.add(client.getMetaInfo().getConnectionId());
                                expelCount--;
                            } else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) { // 保鮮時(shí)間超過20秒放入outDatedConnections集合
                                outDatedConnections.add(client.getMetaInfo().getConnectionId());
                            }

                        }

                        // 3. if total count is still over limit.
                        // expelCount 默認(rèn)為0
                        if (expelCount > 0) {
                            for (Map.Entry<String, Connection> entry : entries) {
                                Connection client = entry.getValue();
                                if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo()
                                        .isSdkSource() && expelCount > 0) {
                                    expelClient.add(client.getMetaInfo().getConnectionId());
                                    expelCount--;
                                    outDatedConnections.remove(client.getMetaInfo().getConnectionId());
                                }
                            }
                        }

                        String serverIp = null;
                        String serverPort = null;
                        if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {
                            String[] split = redirectAddress.split(Constants.COLON);
                            serverIp = split[0];
                            serverPort = split[1];
                        }

                        for (String expelledClientId : expelClient) { // 默認(rèn)空集合
                            try {
                                Connection connection = getConnection(expelledClientId);
                                if (connection != null) {
                                    ConnectResetRequest connectResetRequest = new ConnectResetRequest();
                                    connectResetRequest.setServerIp(serverIp);
                                    connectResetRequest.setServerPort(serverPort);
                                    connection.asyncRequest(connectResetRequest, null);
                                }

                            } catch (ConnectionAlreadyClosedException e) {
                                unregister(expelledClientId);
                            } catch (Exception e) {
                                Loggers.REMOTE_DIGEST.error("Error occurs when expel connection :", expelledClientId, e);
                            }
                        }

                        //4.client active detection.
                        Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());
                        // 超過保鮮期的鏈接集合
                        if (CollectionUtils.isNotEmpty(outDatedConnections)) {
                            Set<String> successConnections = new HashSet<>();
                            final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());
                            for (String outDateConnectionId : outDatedConnections) {
                                try {
                                    Connection connection = getConnection(outDateConnectionId);
                                    if (connection != null) {
                                        ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();
                                        // 超過保鮮時(shí)間的連接,重新異步發(fā)起連接
                                        connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {
                                            @Override
                                            public Executor getExecutor() {
                                                return null;
                                            }

                                            @Override
                                            public long getTimeout() {
                                                return 1000L;
                                            }

                                            @Override
                                            public void onResponse(Response response) {
                                                latch.countDown();
                                                if (response != null && response.isSuccess()) {
                                                    // 刷新激活時(shí)間
                                                    connection.freshActiveTime();
                                                    successConnections.add(outDateConnectionId);
                                                }
                                            }

                                            @Override
                                            public void onException(Throwable e) {
                                                latch.countDown();
                                            }
                                        });
               
                                    } else {
                                        latch.countDown();
                                    }

                                } catch (ConnectionAlreadyClosedException e) {
                                    latch.countDown();
                                } catch (Exception e) {
                                    // ... 
                                    latch.countDown();
                                }
                            }

                            latch.await(3000L, TimeUnit.MILLISECONDS);
                            Loggers.REMOTE_DIGEST
                                    .info("Out dated connection check successCount={}", successConnections.size());

                            // 無效連接集合
                            for (String outDateConnectionId : outDatedConnections) {
                                if (!successConnections.contains(outDateConnectionId)) {
                                    Loggers.REMOTE_DIGEST
                                            .info("[{}]Unregister Out dated connection....", outDateConnectionId);
                                    // 注銷關(guān)閉connection
                                    unregister(outDateConnectionId);
                                }
                            }
                        }

                        if (isLoaderClient) {  // 重置
                            loadClient = -1;
                            redirectAddress = null;
                        }

                    } catch (Throwable e) {
                       
                    }
                }
            }, 1000L3000L, TimeUnit.MILLISECONDS);

        }
        public synchronized void unregister(String connectionId) {
            Connection remove = this.connections.remove(connectionId);
            if (remove != null) {
                String clientIp = remove.getMetaInfo().clientIp;
                AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
                if (atomicInteger != null) {
                    int count = atomicInteger.decrementAndGet();
                    if (count <= 0) {
                        connectionForClientIp.remove(clientIp);
                    }
                }
                remove.close();
                Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
                clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); // 異步
            }
        }

        public void notifyClientDisConnected(final Connection connection) {
                
                for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
                    try {
                        clientConnectionEventListener.clientDisConnected(connection);
                    } catch (Throwable throwable) {
                        Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                                clientConnectionEventListener.getName(), throwable);
                    }
                }
                
         }

        @Override
        public boolean clientDisconnected(String clientId) {
          Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
          ConnectionBasedClient client = clients.remove(clientId);
          if (null == client) {
            return true;
          }
          client.release();
          NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); // 發(fā)布ClientDisconnectEvent事件
          return true;
        }

        小結(jié): 連接可以配置限制規(guī)則具體在${usr.home}/nacos/data/loader/limitRule文件配置,默認(rèn)無限制;通過定時(shí)任務(wù)每隔3秒鐘定時(shí)檢查緩存中的所有連接包括通過來源sdk的連接和集群的連接;如果連接超過保鮮期20秒,并再次發(fā)起連接請求,未能連接成功則注銷關(guān)閉該連接;注銷關(guān)閉時(shí)發(fā)布ClientEvent.ClientDisconnectEvent事件。

        ClientVerifyFailedEvent事件觸發(fā)

        上一篇文章中梳理了Nacos集群中,每個(gè)節(jié)點(diǎn)會(huì)對集群中其他節(jié)點(diǎn)每隔5秒發(fā)送校驗(yàn)數(shù)據(jù),也就是心跳。當(dāng)校驗(yàn)的結(jié)果會(huì)進(jìn)行回調(diào)(gRPC為例),我們翻著看看這部分。

        public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
            if (isNoExistTarget(targetServer)) {
                callback.onSuccess();
            }
            DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);
            Member member = memberManager.find(targetServer);
            try {
                DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,
                        verifyData.getDistroKey().getResourceKey(), callback, member);
                clusterRpcClientProxy.asyncRequest(member, request, wrapper); // 向其他節(jié)點(diǎn)發(fā)送本節(jié)點(diǎn)負(fù)責(zé)的cleintId信息
            } catch (NacosException nacosException) {
                callback.onFailed(nacosException);
            }
        }

        重點(diǎn)看下DistroVerifyCallbackWrapper部分,校驗(yàn)失敗發(fā)布ClientVerifyFailedEvent事件。

        @Override
        public void onResponse(Response response) {
            if (checkResponse(response)) {
                NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());
                distroCallback.onSuccess();
            } else {
                Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);
               // 校驗(yàn)失敗發(fā)布ClientVerifyFailedEvent事件
                NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));
                NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());
                distroCallback.onFailed(null);
            }
        }

        最后看下ClientVerifyFailedEvent這個(gè)類,關(guān)注下成員變量包含了clientId和targetServer。當(dāng)收到ClientVerifyFailedEvent時(shí)用于向targetServer目標(biāo)節(jié)點(diǎn)添加客戶端clientId信息。

        public static class ClientVerifyFailedEvent extends ClientEvent {

            private static final long serialVersionUID = 2023951686223780851L;

            private final String clientId;
            
            private final String targetServer;
            
            public ClientVerifyFailedEvent(String clientId, String targetServer) {
                super(null);
                this.clientId = clientId;
                this.targetServer = targetServer;
            }
            
            public String getClientId() {
                return clientId;
            }
            
            public String getTargetServer() {
                return targetServer;
            }
        }

        小結(jié): Nacos集群之間通過每5秒發(fā)送心跳校驗(yàn)數(shù)據(jù)請求(具體為本節(jié)點(diǎn)負(fù)責(zé)Client信息),其他節(jié)點(diǎn)接受到校驗(yàn)請求,如果緩存中存在該client表示校驗(yàn)成功,同時(shí)更新保鮮時(shí)間;否則校驗(yàn)失敗,回調(diào)返回失敗Response,請求節(jié)點(diǎn)收到失敗的Response后會(huì)發(fā)布ClientVerifyFailedEvent事件。

        瀏覽 73
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            大尺度床戏做爰 | 超碰永久在线 | 成年视频在线 | 国产成人精品自拍 | 国产一级做a爱免费视频 | 欧美寡妇xxxx黑人猛交 | 青青草大香蕉在线 | 神马影院午夜福利 | 插穴AV | 婷婷丁香五月社区亚洲 |