Nacos5# Distro協(xié)議尋址模式
引言
在Nacos服務(wù)端分析服務(wù)注冊(cè)邏輯,就繞不開Distro協(xié)議。該協(xié)議為臨時(shí)一致性協(xié)議,數(shù)據(jù)存儲(chǔ)在緩存中。阿里專門為注冊(cè)中心而設(shè)計(jì)的。后面文章逐步還原該協(xié)議承擔(dān)的職責(zé),本文先分析尋址模式。
尋址概念
尋址是指如何發(fā)現(xiàn)Nacos集群中節(jié)點(diǎn)變化的,當(dāng)檢測(cè)到變化時(shí)能后及時(shí)更新節(jié)點(diǎn)信息
尋址模式
Nacos支持兩種尋址模式分別為「文件尋址」和「地址服務(wù)器尋址」 默認(rèn)為文件尋址,可以通過參數(shù)「nacos.core.member.lookup.type」設(shè)置取值為「file」或者「address-server」 文件尋址路徑默認(rèn)為 「${user.home}/nacos/conf/cluster.conf」 文件尋址cluster.conf配置文件的內(nèi)容格式為「ip1:port,ip2:port」 地址服務(wù)器尋址默認(rèn)為:http://jmenv.tbsite.net:8080/serverlist;其中域名、端口、url均可自定義 檢測(cè)到集群節(jié)點(diǎn)變更時(shí)會(huì)更新緩存并發(fā)布MembersChangeEvent事件 為防止新節(jié)點(diǎn)沒有初始化好,當(dāng)檢測(cè)到新節(jié)點(diǎn)加入時(shí)先設(shè)置該節(jié)點(diǎn)狀態(tài)為DOWN,該節(jié)點(diǎn)不參與通信 過幾秒通過節(jié)點(diǎn)之間通信將已初始化的新節(jié)點(diǎn)狀態(tài)由DOWN設(shè)置為UP,該節(jié)點(diǎn)正式參與通信
尋址是指如何發(fā)現(xiàn)Nacos集群中節(jié)點(diǎn)變化的,當(dāng)檢測(cè)到變化時(shí)能后及時(shí)更新節(jié)點(diǎn)信息。Nacos提供了兩種尋址模式,分別為 文件尋址 和地址服務(wù)器尋址。如果單機(jī)啟動(dòng)就本機(jī)一個(gè)節(jié)點(diǎn)也無所謂尋址。
接下來看下源碼部分如何實(shí)現(xiàn)的。在DistroProtocol類中有一個(gè)成員變量ServerMemberManager memberManager,尋址的邏輯即封裝在ServerMemberManager中。
坐標(biāo):ServerMemberManager#init()
protected void init() throws NacosException {
Loggers.CORE.info("Nacos-related cluster resource initialization");
// 注解@1
this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
// 注解@2
this.localAddress = InetUtils.getSelfIP() + ":" + port;
// 注解@3
this.self = MemberUtil.singleParse(this.localAddress);
// 注解@4
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
this.self.setAbilities(initMemberAbilities());
// 注解@5
serverList.put(self.getAddress(), self);
// 注解@6
registerClusterEvent();
// 注解@7
initAndStartLookup();
if (serverList.isEmpty()) {
throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
}
Loggers.CORE.info("The cluster resource is initialized");
}
注解@1 可以通過server.port指定服務(wù)端端口,默認(rèn)8848
注解@2 獲取本地地址
注解@3 拆分IP和Port組裝Member對(duì)象
注解@4 設(shè)置版本取自pom文件 version=${project.version}
注解@5 緩存本節(jié)點(diǎn)信息
注解@6 發(fā)布MembersChangeEvent事件并訂閱IPChangeEvent事件
private void registerClusterEvent() {
// 發(fā)布MembersChangeEvent事件
NotifyCenter.registerToPublisher(MembersChangeEvent.class,
EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
// 訂閱IPChangeEvent事件
NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {
@Override
public void onEvent(InetUtils.IPChangeEvent event) {
String newAddress = event.getNewIP() + ":" + port;
ServerMemberManager.this.localAddress = newAddress;
EnvUtil.setLocalAddress(localAddress);
Member self = ServerMemberManager.this.self;
self.setIp(event.getNewIP());
String oldAddress = event.getOldIP() + ":" + port;
ServerMemberManager.this.serverList.remove(oldAddress);
ServerMemberManager.this.serverList.put(newAddress, self);
ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
ServerMemberManager.this.memberAddressInfos.add(newAddress);
}
@Override
public Class<? extends Event> subscribeType() {
return InetUtils.IPChangeEvent.class;
}
});
}
注解@7 初始化尋址模式適配器并啟動(dòng);尋址模式分別為單機(jī)、配置文件、地址服務(wù)
private void initAndStartLookup() throws NacosException {
// 注解@7.1
this.lookup = LookupFactory.createLookUp(this);
isUseAddressServer = this.lookup.useAddressServer();
// 注解@7.2
this.lookup.start();
}
注解@7.1 獲取尋址模式適配器
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
if (!EnvUtil.getStandaloneMode()) {
// 注解@7.1.1
String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
LookupType type = chooseLookup(lookupType);
// 注解@7.1.2
LOOK_UP = find(type);
currentLookupType = type;
} else {
// 注解@7.1.3
LOOK_UP = new StandaloneMemberLookup();
}
LOOK_UP.injectMemberManager(memberManager);
Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
return LOOK_UP;
}
注解@7.1.1 尋址類型可以通過「nacos.core.member.lookup.type」參數(shù)指定,取值為「file」或者「address-server」
注解@7.1.2 根據(jù)不同的類型實(shí)例化不同的MemberLookup分別為:FileConfigMemberLookup和AddressServerMemberLookup
private static MemberLookup find(LookupType type) {
if (LookupType.FILE_CONFIG.equals(type)) {
LOOK_UP = new FileConfigMemberLookup();
return LOOK_UP;
}
if (LookupType.ADDRESS_SERVER.equals(type)) {
LOOK_UP = new AddressServerMemberLookup();
return LOOK_UP;
}
throw new IllegalArgumentException();
}
注解@7.1.3 如果采用standalone模式實(shí)例化StandaloneMemberLookup
注解@7.2 尋址適配器啟動(dòng)
standalone尋址適配器啟動(dòng)
public void start() {
if (start.compareAndSet(false, true)) {
String url = InetUtils.getSelfIP() + ":" + EnvUtil.getPort();
afterLookup(MemberUtil.readServerConf(Collections.singletonList(url)));
}
}
備注: 坐標(biāo)StandaloneMemberLookup#start(),獲取本地地址執(zhí)行afterLookup
文件尋址適配器啟動(dòng)
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
readClusterConfFromDisk();
try {
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
} catch (Throwable e) {
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
}
}
}
private void readClusterConfFromDisk() {
Collection<Member> tmpMembers = new ArrayList<>();
try {
List<String> tmp = EnvUtil.readClusterConf(); // 從磁盤文件讀取節(jié)點(diǎn)列表
tmpMembers = MemberUtil.readServerConf(tmp);
} catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
}
afterLookup(tmpMembers);
}
備注: 默認(rèn)從 ${user.home}/nacos/conf/cluster.conf文件中讀取集群地址信息,文件格式為:「ip1:port,ip2:port」。讀取后執(zhí)行afterLookup。并注冊(cè)FileWatcher監(jiān)聽cluster.conf的變化,有變更會(huì)被監(jiān)聽并更新緩存地址列表。
地址服務(wù)器尋址適配器
public void start() throws NacosException {
if (start.compareAndSet(false, true)) {
this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));
initAddressSys();
run();
}
}
每5秒定時(shí)請(qǐng)求地址服務(wù)器
private void run() throws NacosException {
boolean success = false;
Throwable ex = null;
int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);
for (int i = 0; i < maxRetry; i++) {
try {
syncFromAddressUrl();
success = true;
break;
} catch (Throwable e) {
ex = e;
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
}
}
if (!success) {
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);
}
處理地址列表
private void syncFromAddressUrl() throws Exception {
RestResult<String> result = restTemplate
.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());
if (result.ok()) {
isAddressServerHealth = true;
Reader reader = new StringReader(result.getData());
try {
afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));
} catch (Throwable e) {
Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",
ExceptionUtil.getAllExceptionMsg(e));
}
addressServerFailCount = 0;
} else {
addressServerFailCount++;
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());
}
}
備注: 域名默認(rèn)為「jmenv.tbsite.net」可以通過參數(shù)「address.server.domain」指定服務(wù)器地址;端口默認(rèn)為「8080」可以通過參數(shù)「address.server.port」指定;url默認(rèn)為「/serverlist」可以通過參數(shù)指定「address.server.url」。
默認(rèn)為:http://jmenv.tbsite.net:8080/serverlist;每5秒鐘定時(shí)向地址服務(wù)器請(qǐng)求獲取地址列表;獲取列表后執(zhí)行afterLookup。
三種適配器尋址最后都調(diào)用到了afterLookup,接下來看下這塊邏輯。
public void afterLookup(Collection<Member> members) {
this.memberManager.memberChange(members);
}
synchronized boolean memberChange(Collection<Member> members) {
if (members == null || members.isEmpty()) {
return false;
}
// 是否包含本地地址
boolean isContainSelfIp = members.stream()
.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
if (isContainSelfIp) {
isInIpList = true;
} else {
isInIpList = false;
members.add(this.self);
Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);
}
// 集群中地址列表是否有變化
boolean hasChange = members.size() != serverList.size();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members) {
final String address = member.getAddress();
Member existMember = serverList.get(address);
if (existMember == null) { // 有新的節(jié)點(diǎn)加入
hasChange = true;
// 新增的節(jié)點(diǎn)先設(shè)置狀態(tài)為DOWN,過幾秒中通過心跳更改狀態(tài)UP。防止新節(jié)點(diǎn)未成功啟動(dòng)而發(fā)請(qǐng)求
member.setState(NodeState.DOWN);
tmpMap.put(address, member);
} else {
// 已存在,還會(huì)被更新
tmpMap.put(address, existMember);
}
if (NodeState.UP.equals(member.getState())) {
tmpAddressInfo.add(address);
}
}
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
Collection<Member> finalMembers = allMembers();
Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);
if (hasChange) { // 集群節(jié)點(diǎn)有變更
MemberUtil.syncToFile(finalMembers); // 同步寫入磁盤文件cluster.conf中
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event); // 發(fā)布MembersChangeEvent事件
}
return hasChange;
}
備注: 通過尋址適配器獲取的集群節(jié)點(diǎn)列表,會(huì)與緩存的節(jié)點(diǎn)信息進(jìn)行比較。如果有變更會(huì)更新緩存、把全部節(jié)點(diǎn)寫入磁盤文件cluster.conf、同時(shí)發(fā)布MembersChangeEvent事件。
小結(jié): Nacos集群中的節(jié)點(diǎn)變更了怎么發(fā)現(xiàn)呢?Nacos提供兩種模式一個(gè)是通過動(dòng)態(tài)監(jiān)聽配置文件cluster.conf;另外一種是通過定時(shí)5秒去地址中心獲取。
