1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        使用 Curator 操作 ZooKeeper

        共 6726字,需瀏覽 14分鐘

         ·

        2019-05-24 19:00

        Curator是Netflix公司開源的一個(gè)ZooKeeper client library,用于簡化ZooKeeper客戶端編程。它包含如下模塊:

        Framework:Framework是ZooKeeper API的High-Level的封裝,它讓訪問ZooKeeper更加簡單。它基于ZooKeeper添加了一些新的特性,同時(shí)屏蔽了訪問ZooKeeper集群在管理連接和重試操作方面的復(fù)雜度。

        Recipes:在Framework的基礎(chǔ)上,實(shí)現(xiàn)了一些通用的功能,稱之為“菜單”。

        Utilities:訪問ZooKeeper時(shí)候的一些公用方法。

        Client:一個(gè)Low-Level的ZooKeeper客戶端,并有一些公用方法。

        Errors:Curator的異常處理,包括連接問題,異?;謴?fù)等等。

        Extensions:

        連接ZooKeeper

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
        _client.start();

        + View Code

        Curator通過CuratorFrameworkFactory來創(chuàng)建客戶端。new出來的客戶端可以保存并且重用。在使用之前需要start一下,絕大部分Curator的操作都必須先start。

        在new函數(shù)中需要傳入RetryPolicy接口,重連的策略。當(dāng)和ZooKeeper發(fā)生連接異?;蛘卟僮鳟惓5臅r(shí)候,就會(huì)使用重連策略。ExponentialBackoffRetry是其中一種重連策略。Curator支持很多種重連策略:RetryNTimes(重連N次策略)、RetryForever(永遠(yuǎn)重試策略)、ExponentialBackoffRetry(基于backoff的重連策略)、BoundedExponentialBackoffRetry(有邊界的基于backoff的重連策略,即,設(shè)定最大sleep時(shí)間)等等。

        ??

        下面是官方例子中,ExponentialBackoffRetry的代碼片段。

        + View Code

        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
         if ( sleepMs > maxSleepMs )
        {
        log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
        sleepMs = maxSleepMs;
        }
        return sleepMs;


        可以看出ExponentialBackoffRetry 重連的時(shí)間間隔一般是隨著重試的次數(shù)遞增的,如果時(shí)間間隔計(jì)算出來大于默認(rèn)的最大sleep時(shí)間的話,則去最大sleep時(shí)間。ExponentialBackoffRetry 除了時(shí)間的限制以外,還有最大重連次數(shù)的限制。而BoundedExponentialBackoffRetry策略只是讓用戶設(shè)置最大sleep時(shí)間而已。默認(rèn)的最大時(shí)間是Integer.MAX_VALUE毫秒。

        ZooKeeper節(jié)點(diǎn)操作

        ZooKeeper 節(jié)點(diǎn)優(yōu)點(diǎn)像文件系統(tǒng)的文件夾,每個(gè)節(jié)點(diǎn)都可以包含數(shù)據(jù)。但是ZooKeeper的節(jié)點(diǎn)是有生命周期的,這取決于節(jié)點(diǎn)的類型。在 ZooKeeper 中,節(jié)點(diǎn)類型可以分為持久節(jié)點(diǎn)(PERSISTENT )、臨時(shí)節(jié)點(diǎn)(EPHEMERAL),以及時(shí)序節(jié)點(diǎn)(SEQUENTIAL ),具體在節(jié)點(diǎn)創(chuàng)建過程中,一般是組合使用,可以生成以下 4 種節(jié)點(diǎn)類型。不同的組合可以應(yīng)用到不同的業(yè)務(wù)場景中。

        ?

        1. 持久化節(jié)點(diǎn)

        持久化節(jié)點(diǎn)創(chuàng)建后,就一直存在,除非有刪除操作主動(dòng)來刪除這個(gè)節(jié)點(diǎn),持久化節(jié)點(diǎn)不會(huì)因?yàn)閯?chuàng)建該節(jié)點(diǎn)的客戶端會(huì)話失效而消失。如果重復(fù)創(chuàng)建,客戶端會(huì)拋出NodeExistsException異常。

        byte[] data = { 1, 2, 3 };
        _client.create().withMode(CreateMode.PERSISTENT).forPath("/zktest/p1", data);

        ?

        2. 臨時(shí)節(jié)點(diǎn)

        創(chuàng)建臨時(shí)節(jié)點(diǎn)后,如果客戶端會(huì)話失效,那么這個(gè)節(jié)點(diǎn)會(huì)自動(dòng)被ZooKeeper刪除。這里是客戶端失效,并不是客戶端斷開連接。因?yàn)閆ooKeeper服務(wù)端和客戶端是用心跳維持狀態(tài),會(huì)話留一點(diǎn)時(shí)間,這個(gè)時(shí)間是在創(chuàng)建連接的時(shí)候可以設(shè)置sessionTimeoutMs參數(shù):

        CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);

        創(chuàng)建臨時(shí)節(jié)點(diǎn)的代碼如下:

        _client.create().withMode(CreateMode.EPHEMERAL).forPath("/zktest/e1", data);

        ?

        3. 持久化時(shí)序節(jié)點(diǎn)

        _client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/zktest/ps1", data);

        上述代碼執(zhí)行兩次,你會(huì)發(fā)現(xiàn)客戶端不會(huì)報(bào)NodeExistsException異常,ZooKeeper會(huì)為你創(chuàng)建2個(gè)節(jié)點(diǎn),ZooKeeper在每個(gè)父節(jié)點(diǎn)會(huì)為他的第一級子節(jié)點(diǎn)維護(hù)一份時(shí)序,會(huì)記錄每個(gè)子節(jié)點(diǎn)創(chuàng)建的先后順序。在創(chuàng)建子節(jié)點(diǎn)的時(shí)候,可以設(shè)置這個(gè)屬性,那么在創(chuàng)建節(jié)點(diǎn)過程中,ZooKeeper會(huì)自動(dòng)為給定節(jié)點(diǎn)名加上一個(gè)數(shù)字后綴,作為新的節(jié)點(diǎn)名。

        Clipboard Image.png

        ?

        4. 臨時(shí)時(shí)序節(jié)點(diǎn)

        持久化時(shí)序節(jié)點(diǎn)不同的就是節(jié)點(diǎn)會(huì)在會(huì)話失效的時(shí)候回消失。

        _client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/zktest/es1", data);

        ?

        5. 設(shè)置和獲取節(jié)點(diǎn)數(shù)據(jù)

        //設(shè)置節(jié)點(diǎn)數(shù)據(jù)
        _client.setData().forPath("/zktest/ps1", data);
        //獲取節(jié)點(diǎn)數(shù)據(jù)
        byte[] data2 = _client.getData().forPath("/zktest/ps1");

        ?

        分布式鎖

        使用數(shù)據(jù)庫、Redis、文件系統(tǒng)都可以實(shí)現(xiàn)分布式鎖,同樣ZooKeeper也可以用來實(shí)現(xiàn)分布式鎖。Curator提供了InterProcessMutex類來幫助我們實(shí)現(xiàn)分布式鎖,其內(nèi)部就是使用的EPHEMERAL_SEQUENTIAL類型節(jié)點(diǎn)。

        ?

        public void test() throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
        
            _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
            _client.start();
        
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        
            for (int i = 0; i < 5; i++) {
                fixedThreadPool.submit(new Runnable() {
        
                    @Override
                    public void run() {
        
                        while (true) {
                            try {
                                dowork();
                            } catch (Exception e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
        }
        
        private void dowork() throws Exception {
        
            InterProcessMutex ipm = new InterProcessMutex(_client, "/zktest/distributed_lock");
        
            try {
                ipm.acquire();
        
                _logger.info("Thread ID:" + Thread.currentThread().getId() + " acquire the lock");
        
                Thread.sleep(1000);
        
                _logger.info("Thread ID:" + Thread.currentThread().getId() + " release the lock");
            } catch (Exception e) {
        
            } finally {
                ipm.release();
            }
        }


        執(zhí)行結(jié)果如下圖:

        Clipboard Image.png

        ?

        acquire()方法,會(huì)在給定的路徑下面創(chuàng)建臨時(shí)時(shí)序節(jié)點(diǎn)的時(shí)序節(jié)點(diǎn)。然后它會(huì)和父節(jié)點(diǎn)下面的其他節(jié)點(diǎn)比較時(shí)序。如果客戶端創(chuàng)建的臨時(shí)時(shí)序節(jié)點(diǎn)的數(shù)字后綴最小的話,則獲得該鎖,函數(shù)成功返回。如果沒有獲得到,即,創(chuàng)建的臨時(shí)節(jié)點(diǎn)數(shù)字后綴不是最小的,則啟動(dòng)一個(gè)watch監(jiān)聽上一個(gè)(排在前面一個(gè)的節(jié)點(diǎn))。主線程使用object.wait()進(jìn)行等待,等待watch觸發(fā)的線程notifyAll(),一旦上一個(gè)節(jié)點(diǎn)有事件產(chǎn)生馬上再次出發(fā)時(shí)序最小節(jié)點(diǎn)的判斷。

        release()方法就是釋放鎖,內(nèi)部實(shí)現(xiàn)就是刪除創(chuàng)建的EPHEMERAL_SEQUENTIAL節(jié)點(diǎn)。

        Leader選舉

        選舉可以用來實(shí)現(xiàn)Master-Slave模式,也可以用來實(shí)現(xiàn)主備切換等功能。Curator提供兩種方式實(shí)現(xiàn)選舉:LeaderSelector 和 LeaderLatch。兩種方法都可以使用,LeaderLatch語法較為簡單一點(diǎn),LeaderSelector控制度更高一些。

        使用LeaderSelector:

        public void test() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
        
            _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
            _client.start();
        
            dowork();
        
        }
        
        private void dowork() {
        
            LeaderSelectorListener listener = new
        
            LeaderSelectorListenerAdapter() {
                public void takeLeadership(CuratorFramework client) throws Exception {
                    logger.info("Take the lead.");
        
                    Thread.sleep(10000);
        
                    logger.info("Relinquish the lead.");
                }
        
            };
        
            LeaderSelector selector = new LeaderSelector(_client, "/zktest/leader", listener);
            selector.autoRequeue();
            selector.start();
        }


        LeaderSelector的內(nèi)部使用分布式鎖InterProcessMutex實(shí)現(xiàn), 并且在LeaderSelector中添加一個(gè)Listener,當(dāng)獲取到鎖的時(shí)候執(zhí)行回調(diào)函數(shù)takeLeadership。函數(shù)執(zhí)行完成之后就調(diào)用InterProcessMutex.release()釋放鎖,也就是放棄Leader的角色。

        ?

        使用LeaderLatch:

        public void test() {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
        
            _client = CuratorFrameworkFactory.newClient("10.23.22.237:2181", retryPolicy);
            _client.start();
        
            dowork();
        
        }
        
        private void dowork() {
            LeaderLatch leader = new LeaderLatch(_client, "/zktest/leader");
            leader.addListener(new LeaderLatchListener() {
        
                @Override
                public void isLeader() {
                    // TODO Auto-generated method stub
                    logger.info("Take the lead.");
        
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
        
                    logger.info("Relinquish the lead.");
                }
        
                @Override
                public void notLeader() {
                    // TODO Auto-generated method stub
                    logger.info("I am not Leader");
                }
            });
        
            try {
                leader.start();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }


        同樣是實(shí)現(xiàn)Leader選舉的LeaderLatch并沒有通過InterProcessMutex實(shí)現(xiàn),它使用了原生的創(chuàng)建EPHEMERAL_SEQUENTIAL節(jié)點(diǎn)的功能再次實(shí)現(xiàn)了一遍。同樣的在isLeader方法中需要實(shí)現(xiàn)Leader的業(yè)務(wù)需求,但是一旦isLeader方法返回,就相當(dāng)于Leader角色放棄了,重新進(jìn)入選舉過程。



        作者:nick hao

        原文鏈接:https://www.cnblogs.com/haoxinyue/p/6561896.html

        本文轉(zhuǎn)自博客園網(wǎng),版權(quán)歸原作者所有。

        瀏覽 45
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            中文字幕日韩视频在线 | 欧美成人无码A片免费一区澳门 | 国产免费一区二区三区 | 韩国免费黄色片 | 国内av在线 | 乱偷尝禁果2中文字幕 | 国产寡妇亲子伦一区二区三区四区 | 影音资源在线免费观看 | 免费无码在线播放 | 激情五月久久 |