別用ZkClient了,Curator才是ZooKeeper的好伴侶.
zookeeper作為分布式系統(tǒng)中重要的協(xié)調(diào)組件,在后端開發(fā)中是難以繞開的一個重要知識領(lǐng)域。
可以說,只要在后端領(lǐng)域,比如說Java開發(fā)、大數(shù)據(jù)開發(fā)中待過三年及以上的工程師,或多或少都接觸過或者直接使用過zookeeper。
因此筆者開啟本系列,作為自己學(xué)習(xí)zookeeper(后文均稱為zk)的記錄,如果能夠啟發(fā)讀者那就更好不過了。
zookeeper概述
?ZooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù),是Google的Chubby一個開源的實現(xiàn),是Hadoop和Hbase的重要組件。
它是一個為分布式應(yīng)用提供一致性服務(wù)的軟件,提供的功能包括:配置維護(hù)、域名服務(wù)、分布式同步、組服務(wù)等。
?
度娘如是說。
zookeeper,直譯過來就是動物園管理員,之所以這么說,主要是因為它在大數(shù)據(jù)技術(shù)棧中扮演了重要的協(xié)調(diào)角色。
在hadoop技術(shù)棧中,各種技術(shù)的logo都是小動物,而zookeeper的官方logo也是一位園丁模樣的男士。這也直觀地告訴了使用者,zk的用途和角色。
一言蔽之,zk就是在分布式系統(tǒng)中,對應(yīng)用提供一致性保證,分布式選主,通過各種機(jī)制,對應(yīng)用進(jìn)行協(xié)調(diào),從而使分布式系統(tǒng)對外提供某種特定的服務(wù)。

關(guān)于如何搭建zookeeper,網(wǎng)絡(luò)上文章有很多,本文就不進(jìn)行展開,感興趣的可以自行查找相關(guān)資料。
https://zookeeper.apache.org/
curator概述
?Apache Curator是Netflix公司開源的一套zookeeper客戶端框架,并貢獻(xiàn)給了apache社區(qū)。
它封裝了Zookeeper客戶端底層的api,提供了流式風(fēng)格的api,提供了很多開箱即用的高級特性,如:分布式選主、分布式鎖、path監(jiān)控、node監(jiān)控、更加易用的節(jié)點CRUD操作、分布式隊列等。
Patrixck Hunt(Zookeeper的commiter)以一句“Guava is to Java that Curator to Zookeeper”給予Curator高度評價。
?
目前主流的zookeeper客戶端共有三種:
- 官方zookeeper客戶端
- zkClient
- curator
其中,官方的客戶端提供的api都比較底層,開發(fā)者直接拿來用需要進(jìn)行一定的封裝,否則直接使用會顯得過于復(fù)雜和繁瑣;zkclient雖然使用起來比較方便,但是文檔較少,社區(qū)也不太活躍;而curator則是apache頂級項目,擁有活躍的開源社區(qū),且擁有較多的成熟api和高級特性。
因此在zk的客戶端這個領(lǐng)域,curator大受推崇。
http://curator.apache.org/index.html

curator基礎(chǔ)操作
我們通過代碼來直接感受一下curator操作的快捷。
首先需要在工程中引入curator的依賴:
????
????
??????org.apache.curator
??????curator-framework
??????4.3.0
????
????
????
??????org.apache.curator
??????curator-recipes
??????4.3.0
????
「注意」 筆者使用的zookeeper服務(wù)端版本為3.5.6,因此使用4.0.0以上版本的curator是兼容的。
對于較低版本的zookeeper服務(wù)端,如3.4.x,則需要依賴curator2.x版本,如:2.12.0。如果使用高版本的curator,需要將curator自身依賴的ZooKeeper在maven中exclude掉。并引入對應(yīng)的低版本zookeeper客戶端。
關(guān)于curator與zookeeper具體的版本依賴,請參考官方的說明 ZooKeeper Version Compatibility
創(chuàng)建客戶端實例
使用Curator第一步是要創(chuàng)建一個客戶端實例,代碼如下:(后續(xù)的操作中,第一步均是創(chuàng)建客戶端實例。后續(xù)講解過程中只粘貼關(guān)鍵代碼,在文章的末尾會粘貼本文中所有的代碼案例的完整代碼)。
????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
????CuratorFramework?client?=
????????????CuratorFrameworkFactory.newClient("127.0.0.1:2181",?retry);
????client.start();
????System.out.println("啟動curator客戶端");
簡單解釋下這段代碼的含義:
首先創(chuàng)建一個重試策略實例RetryPolicy,當(dāng)客戶端與zk服務(wù)端連接失敗或者超時,curator會使用我們指定的 重試策略進(jìn)行重試。RetryPolicy有多個實現(xiàn),這里使用ExponentialBackoffRetry策略,重試三次,每次間隔1秒鐘。
通過CuratorFrameworkFactory創(chuàng)建一個CuratorFramework實例,傳入zk連接地址以及重試策略。示例代碼中為單機(jī)方式連接串,如果是多節(jié)點方式只需要通過半角逗號分割的方式進(jìn)行連接即可。
????多節(jié)點連接串
????ip0:port0,ip1:port1,ip2:port2
- 調(diào)用CuratorFramework.start()方法,與zk服務(wù)端建立連接。
如此,我們的客戶端便能夠與zk服務(wù)端建立起長連接,從而為各種交互做準(zhǔn)備。
節(jié)點操作
連接建立好后,我們來學(xué)習(xí)一下curator如何對zk節(jié)點進(jìn)行操作。通俗地說就是通過curator對zk的node做增刪改查操作。
????????try?{
????????????//?增加
????????????client.create()
????????????????????.creatingParentsIfNeeded()
????????????????????.withMode(CreateMode.PERSISTENT)
????????????????????.forPath("/snowalker/path",?"100".getBytes());
????????????????????
????????????//?讀取node的值
????????????byte[]?dataBytes?=?client.getData().forPath("/snowalker/path");
????????????
????????????System.out.println(new?String(dataBytes));
????????????//?修改node對應(yīng)的值
????????????client.setData().forPath("/snowalker/path",?"120".getBytes());
????????????byte[]?dataBytes1?=?client.getData().forPath("/snowalker/path");
????????????
????????????System.out.println(new?String(dataBytes1));
????????????//?獲取子節(jié)點
????????????List?children?=?client.getChildren().forPath("/snowalker");
????????????System.out.println(children);
????????????
????????????//?刪除節(jié)點
????????????client.delete().forPath("/snowalker/path");
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????client.close();
????????}
這段代碼基本上就涵蓋了curator對zk的node進(jìn)行增刪改查的主流操作了。介紹下代碼含義:
- 「增」 首先,通過create方法在zk上創(chuàng)建了 ”/snowalker/path“ 這樣一個持久化節(jié)點。方法 creatingParentsIfNeeded() 表示 「如果有必要則創(chuàng)建父節(jié)點」,也就是遞歸地創(chuàng)建多個節(jié)點。
- 節(jié)點建立后,寫入value;value在zk的node上以字節(jié)形式進(jìn)行存儲;初始值為100,并進(jìn)行打印
- 「查」 通過CuratorFramework.getData().forPath("/snowalker/path") ?可以讀取對應(yīng)節(jié)點的value
- 「改」 接著我們通過CuratorFramework.setData() 修改”/snowalker/path“ 對應(yīng)的value為120,并進(jìn)行打印
- 如果想要獲取某個子節(jié)點,我們可以通過CuratorFramework.getChildren().forPath(path) 方法獲取,返回一個list;也就是說,zk的子節(jié)點是一對多的(zk文件系統(tǒng)是樹形結(jié)構(gòu))
- 「刪」 通過執(zhí)行CuratorFramework.delete().forPath(path) 能夠?qū)⒅付╬ath進(jìn)行刪除
運行代碼,觀察到日志打印如下:
????啟動curator客戶端
????100
????120
????[path]
使用zkcli連接zookeeper服務(wù)端,ls看一下 /snowalker 目錄下的節(jié)點:
????[zk:?localhost:2181(CONNECTED)?6]?ls?/
????[snowalker]
????[zk:?localhost:2181(CONNECTED)?7]?ls?/snowalker
????[]
????[zk:?localhost:2181(CONNECTED)?8]
當(dāng)前只有 /snowalker 節(jié)點存在,子節(jié)點已經(jīng)被刪除。
可以看到,通過curator,我們通過幾行代碼便實現(xiàn)了對zk node的增刪改查
curator進(jìn)階操作
zk提供了分布式選主、watcher動態(tài)監(jiān)聽等機(jī)制,能夠為分布式系統(tǒng)提供分布式協(xié)調(diào),配置實時變更通知等能力。Curator當(dāng)然也提供了對應(yīng)的API供我們進(jìn)行調(diào)用。
接下來我們就分別看一下如何使用Curator來使用這些能力。
集群選主
首先看一下如何利用Curator實現(xiàn)集群選主。
Curator提供兩種方式進(jìn)行集群選主,分別為:
- LeaderLatch方式
- LeaderElection方式
LeaderLatch方式
首先觀察一下leaderLatch選主方式調(diào)用方式:
????public?class?LeaderLatchDemo?{
????
????????public?static?void?main(String[]?args)?throws?Exception?{
????????????new?LeaderLatchDemo().leaderLatch();
????????}
????
????????public?void?leaderLatch()?throws?Exception?{
首先我們還是要實例化一個CuratorFramework客戶端,與zk服務(wù)端建立連接
????????????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
????????????CuratorFramework?client?=
????????????????????CuratorFrameworkFactory.newClient(
????????????????????????????"127.0.0.1:2181",
????????????????????????????5000,
????????????????????????????3000,
????????????????????????????retry);
????????????client.start();
????????????System.out.println("啟動curator客戶端");
接著注冊一個連接狀態(tài)監(jiān)聽器,在回調(diào)方法中根據(jù)返回的連接狀態(tài)進(jìn)行對應(yīng)操作。
當(dāng)連接狀態(tài)ConnectionState為LOST時,表明客戶端到服務(wù)端的連接已經(jīng)斷開,如果當(dāng)前節(jié)點已經(jīng)是leader,那么我們就需要暫停leader身份下的一切事情。
如果我們查看源碼的話,會發(fā)現(xiàn)curator內(nèi)部會將是否為leader的狀態(tài)設(shè)置為false(已經(jīng)不是leader了)
????????????client.getConnectionStateListenable().addListener(new?ConnectionStateListener()?{
????????????????@Override
????????????????public?void?stateChanged(CuratorFramework?client,?ConnectionState?newState)?{
????????????????????switch?(newState)?{
????????????????????????case?LOST:?{
????????????????????????????break;
????????????????????????}
????????????????????}
????????????????}
????????????});
我們聲明一個路徑作為選主的依據(jù)。
通過 「LeaderLatch leaderLatch = new LeaderLatch(client, latchPath);」 實例化一個LeaderLatch實例,通過它進(jìn)行l(wèi)eader選舉操作。
????????????//?latch
????????????String?latchPath?=?"/snowalker/leader_latch";
????????????LeaderLatch?leaderLatch?=?new?LeaderLatch(client,?latchPath);
????????????//?開啟leader選舉過程
????????????leaderLatch.start();
????????????//?判斷當(dāng)前節(jié)點是否為leader
????????????boolean?hasLeadershipBefore?=?leaderLatch.hasLeadership();
????????????System.out.println("是否成為leader:"?+?hasLeadershipBefore);
????
????????????leaderLatch.await();????
當(dāng)通過 「leaderLatch.start()」 開啟leader選舉之后,我們需要調(diào)用 ?「leaderLatch.await()」 。
如果當(dāng)前的客戶端未成為leader,則會進(jìn)行等待,(內(nèi)部源碼實現(xiàn)是通過Object.wait()進(jìn)行阻塞) 直到成為leader后,當(dāng)前客戶端線程會被喚醒,繼續(xù)執(zhí)行后續(xù)邏輯。
這里說的后續(xù)邏輯,實際上就是客戶端作為leader節(jié)點需要執(zhí)行的業(yè)務(wù)邏輯。比如:hdfs中兩臺機(jī)器,當(dāng)其中一臺機(jī)器成為主節(jié)點就會以主節(jié)點的身份對外提供文件相關(guān)服務(wù),而另外一臺非leader機(jī)器則會await在這里,直到它成為leader才會作為主節(jié)點提供服務(wù)。(也就是說,只有被選為leader的節(jié)點才會真正的提供服務(wù),否則它看起來就好像 “假死” 了)
????????????boolean?hasLeadershipAfter?=?leaderLatch.hasLeadership();
????????????System.out.println("是否成為leader:"?+?hasLeadershipAfter);
????????????Thread.sleep(100000);
????????}
????}
最后我們再打印一下節(jié)點的狀態(tài),看當(dāng)前節(jié)點是否成為leader。
運行效果
我們同時啟動兩個LeaderLatchDemo主進(jìn)程,模擬雙節(jié)點下的leader選舉過程。
兩個客戶端控制臺打印如下:
?客戶端A
?
啟動curator客戶端
是否成為leader:false
?客戶端B
?
啟動curator客戶端
是否成為leader:false
是否成為leader:true
上述日志打印,表明開始階段,兩個客戶端均非leader。
當(dāng)經(jīng)過競爭之后,客戶端B成為leader,而客戶端A則阻塞。我們嘗試關(guān)閉客戶端B進(jìn)程,觀察客戶端A的控制臺日志打?。?/p>
?客戶端A
?
啟動curator客戶端
是否成為leader:false
是否成為leader:true
我們發(fā)現(xiàn),客戶端A成為了leader,從阻塞中喚醒。
這個小demo直觀地為我們展現(xiàn)了通過leaderLatch進(jìn)行l(wèi)eader選舉的場景。
LeaderElection方式
curator為我們提供了一種更為簡潔的leader選舉方式,它就是 「LeaderElection」 方式。(「實際上」,LeaderElection與LeaderLatch在原理上幾乎沒有差別,他們的原理都是基于分布式鎖實現(xiàn)的,只不過LeaderElection方式在使用上更加簡潔,開發(fā)效率更高)
話不多說,直接上代碼:
????public?class?LeaderElectionDemo?{
????
????????public?static?void?main(String[]?args)?throws?InterruptedException?{
首先還是實例化一個CuratorFramework建立到zk服務(wù)端的連接。
????????????RetryPolicy?retry?=?new?ExponentialBackoffRetry(1000,?3);
????????????CuratorFramework?client?=
????????????????????CuratorFrameworkFactory.newClient(
????????????????????????????"127.0.0.1:2181",
????????????????????????????5000,
????????????????????????????3000,
????????????????????????????retry);
????????????client.start();
????????????System.out.println("啟動curator客戶端");
接著定義一個leader選舉節(jié)點,這個操作和LeaderLatch相似。
????????????String?election?=?"/leader/election";
這里就不同了,我們需要建立一個LeaderSelector實例,它接收CuratorFramework實例、選舉節(jié)點、以及一個LeaderSelectorListener ?Leader選舉監(jiān)聽器。
我們需要實現(xiàn)LeaderSelectorListener的回調(diào)方法:
takeLeadership回調(diào)中需要開發(fā)者實現(xiàn)當(dāng)成為leader之后的業(yè)務(wù)邏輯。當(dāng)一個客戶端成為leader之后,便會回調(diào)takeLeadership方法,執(zhí)行l(wèi)eader角色的業(yè)務(wù)邏輯
stateChanged方法需要開發(fā)者實現(xiàn)當(dāng)連接狀態(tài)發(fā)生變化之后的業(yè)務(wù)邏輯。比如:我們可以直接拋出異常,阻止leader業(yè)務(wù)邏輯繼續(xù)進(jìn)行。待另外的節(jié)點成為leader后執(zhí)行takeLeadership方法
??????LeaderSelector?leaderSelector?=?new?LeaderSelector(
??????????????client,
??????????????election,
??????????????new?LeaderSelectorListener()?{
??????????????????@Override
??????????????????public?void?takeLeadership(CuratorFramework?curatorFramework)?throws?Exception?{
??????????????????????System.out.println("你已經(jīng)成為leader");
??????????????????????//?在?這里干leader的所有事情,此時方法不能退出
??????????????????????Thread.sleep(Integer.MAX_VALUE);
??????????????????}
??????
??????????????????@Override
??????????????????public?void?stateChanged(CuratorFramework?curatorFramework,?ConnectionState?connectionState)?{
??????????????????????System.out.println("你已經(jīng)不是leader,鏈接狀態(tài)發(fā)生變化,connectionState"?+?connectionState);
??????????????????????if?(connectionState.equals(ConnectionState.LOST))?{
??????????????????????????throw?new?CancelLeadershipException();
??????????????????????}
??????????????????}
??????????????});
通過調(diào)用LeaderSelector.start之后,多個客戶端會在election節(jié)點下競爭leader角色。當(dāng)某個客戶端競爭leader成功,就會執(zhí)行takeLeadership回調(diào)方法通知當(dāng)前應(yīng)用節(jié)點已經(jīng)成為leader。接著執(zhí)行l(wèi)eader角色的邏輯即可
????????????leaderSelector.start();
????????????Thread.sleep(Integer.MAX_VALUE);
????????}
????}
運行代碼
我們啟動兩個LeaderElectionDemo客戶端,讓他們進(jìn)行l(wèi)eader角色的選舉操作,觀察控制臺輸出:
客戶端A打印如下:
????啟動curator客戶端
????你已經(jīng)成為leader
客戶單B打印如下:
????啟動curator客戶端
這表明,客戶端A競爭leader成功,并成功執(zhí)行了回調(diào)方法takeLeadership??蛻舳薆競爭leader失敗,進(jìn)程阻塞。
我們強(qiáng)制關(guān)閉客戶端A,此時客戶端B控制臺輸出如下:
????啟動curator客戶端
????你已經(jīng)成為leader
這表明,客戶端A釋放了leader角色,客戶端B競爭成功,并開始執(zhí)行l(wèi)eader角色的方法。
「事實上」 LeaderElection方式內(nèi)部實現(xiàn)機(jī)制幾乎與LeaderLatch方式一模一樣,它本質(zhì)上也是通過分布式鎖競爭成為leader的。
具體到細(xì)節(jié)就是,LeaderElection是通過使用curator實現(xiàn)的mutex鎖進(jìn)行l(wèi)eader競爭。如果獲取到的鎖就是leader。如果競爭leader的時候競爭鎖失敗,則會阻塞,并為上個節(jié)點添加watcher。
當(dāng)上個節(jié)點對應(yīng)的客戶端down機(jī)或者長時間斷開連接,則順序臨時節(jié)點就消失了,此時watcher會通知后一個節(jié)點進(jìn)行加鎖。后面的節(jié)點加鎖成功便會成為leader角色。
我們發(fā)現(xiàn),這其實就是Curator的分布式鎖實現(xiàn)機(jī)制啊。
后續(xù)我們會對LeaderElection具體的代碼實現(xiàn)進(jìn)行展開講解。敬請期待。
小結(jié)
本文到此就告一段落了,我們對curator的基本使用以及重要的leader選舉特性進(jìn)行了全方位的講解和展示。
這里做個預(yù)告,接下來我會帶領(lǐng)讀者朋友們繼續(xù)學(xué)習(xí)curator對zookeeper的watcher機(jī)制的封裝和增強(qiáng)。
