1. 同步組件CyclicBarrier源碼解析

        共 4022字,需瀏覽 9分鐘

         ·

        2023-06-25 22:37

        走過路過不要錯過

        點擊藍字關(guān)注我們


        CyclicBarrier概述

        CyclicBarrier可以理解為Cyclic + Barrier, 可循環(huán)使用 + 屏障嘛。

        • 之所以是Cyclic的,是因為當所有等待線程執(zhí)行完畢,并重置CyclicBarrier的狀態(tài)后它可以被重用。

        • 之所以叫Barrier,是因為線程調(diào)用await方法后就會被阻塞,阻塞點就叫做屏障點。

        可以讓一組線程全部到達一個屏障【同步點】,再全部沖破屏障,繼續(xù)向下執(zhí)行。

        案例學習

        public class CycleBarrierTest2 {

        private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(
        2, // 計數(shù)器的初始值
        new Runnable() { // 計數(shù)器值為0時需要執(zhí)行的任務(wù)
        @Override
        public void run () {
        System.out.println(Thread.currentThread() + " tripped ~");
        }
        }
        );

        public static void main (String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new Runnable() {
        @SneakyThrows
        @Override
        public void run () {
        Thread thread = Thread.currentThread();
        System.out.println(thread + " step 1");
        cyclicBarrier.await();
        System.out.println(thread + " step 2");
        cyclicBarrier.await();
        System.out.println(thread + " step 3");
        }
        });

        executorService.submit(new Runnable() {
        @SneakyThrows
        @Override
        public void run () {
        Thread thread = Thread.currentThread();
        System.out.println(thread + " step 1");
        cyclicBarrier.await();
        System.out.println(thread + " step 2");
        cyclicBarrier.await();
        System.out.println(thread + " step 3");
        }
        });

        executorService.shutdown();
        }

        }


        測試結(jié)果如下:

        Thread[pool-1-thread-2,5,main] step 1
        Thread[pool-1-thread-1,5,main] step 1
        Thread[pool-1-thread-1,5,main] tripped ~
        Thread[pool-1-thread-1,5,main] step 2
        Thread[pool-1-thread-2,5,main] step 2
        Thread[pool-1-thread-2,5,main] tripped ~
        Thread[pool-1-thread-2,5,main] step 3
        Thread[pool-1-thread-1,5,main] step 3


        • 創(chuàng)建了一個CyclicBarrier,指定parties為2作為初始計數(shù)值,指定Runnable任務(wù)作為所有線程到達屏障點時需要執(zhí)行的任務(wù)。

        • 創(chuàng)建了一個大小為2的線程池,向線程池中提交兩個任務(wù),我們根據(jù)測試結(jié)果來說明這一過程。

        • thread2線程率先執(zhí)行await(),此時計數(shù)值減1,并不為0,因此thread2線程到達屏障點,陷入阻塞。

        • thread1線程之后執(zhí)行await(),此時計數(shù)值減1后為0,接著執(zhí)行構(gòu)造器中指定的任務(wù),打印tripped,執(zhí)行完后退出屏障點,喚醒thread2。

        • 可以看到并不是和CountdownLatch一樣是一次性的,而是可重復使用的,退出屏障點后,計數(shù)值又被設(shè)置為2,之后又重復之前的步驟。

        多個線程之間是相互等待的,加入當前計數(shù)器值為N,之后N-1個線程調(diào)用await方法都會達到屏障點而阻塞,只有當?shù)贜個線程調(diào)用await方法時,計數(shù)器值為0,第N個線程才會喚醒之前等待的所有線程,再一起向下執(zhí)行。

        CyclicBarrier是可復用的,所有線程達到屏障點之后,CyclicBarrier會被重置。

        類圖結(jié)構(gòu)及重要字段

        public class CyclicBarrier {

        private static class Generation {
        boolean broken = false;
        }

        /** 獨占鎖保證同步 */
        private final ReentrantLock lock = new ReentrantLock();
        /** condition實現(xiàn)等待通知機制 */
        private final Condition trip = lock.newCondition();
        /** 記錄線程個數(shù) */
        private final int parties;
        /* 達到屏障點執(zhí)行的任務(wù) */
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();

        /**
        * 記錄仍在等待的parties數(shù)量, 每一代count都會從初始的parties遞減至0
        */

        private int count;

        // 指定barrierAction, 在線程達到屏障后,優(yōu)先執(zhí)行barrierAction
        public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
        }

        // 指定parties, 希望屏障攔截的線程數(shù)量
        public CyclicBarrier(int parties) {
        this(parties, null);
        }
        }


        • 基于ReentrantLock獨占鎖實現(xiàn)同步與等待通知機制,底層基于AQS。

        • int類型parties記錄線程個數(shù),表示多少線程調(diào)用await方法后,所有線程才會沖破屏障繼續(xù)向下運行。

        • int類型count初始化為parties,每當有線程調(diào)用await方法就遞減1,count為0表示所有線程到達屏障點。

        CyclicBarrier是可復用的,因此使用兩個變量記錄線程個數(shù),count變?yōu)?時,會將parties賦值給count,進行復用。

        • barrierCommand是所有線程到達屏障點后執(zhí)行的任務(wù)。

        • CyclicBarrier是可復用的,Generation用于標記更新?lián)Q代,generation內(nèi)部的broken變量用來記錄當前屏障是否被打破。

        本篇文章閱讀需要建立在一定獨占鎖,Condition條件機制的基礎(chǔ)之上,這邊推薦幾篇前置文章,可以瞅一眼:

        內(nèi)部類Generation及相關(guān)方法

        CyclicBarrier是可復用的,Generation用于標記更新?lián)Q代。

            // 屏障的每一次使用都會生成一個新的Generation實例: 可能是 tripped or reset
        private static class Generation {
        boolean broken = false;
        }


        void reset()

        更新?lián)Q代: 首先標記一下當前這代不用了, 然后換一個新的。

            public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        breakBarrier(); // break掉當前的
        nextGeneration(); // 開啟一個新的
        } finally {
        lock.unlock();
        }
        }


        void breakBarrier()

        標記一下broken為true,喚醒一下await等待線程,重置count。

            private void breakBarrier() {
        // 標記broken 為true
        generation.broken = true;
        // 重置count
        count = parties;
        // 喚醒因await等待的線程
        trip.signalAll();
        }


        void nextGeneration()

        喚醒一下await等待線程,重置count,更新為下一代。

            private void nextGeneration() {
        // 喚醒因await等待的線程
        trip.signalAll();
        // 重置count,意味著下一代了
        count = parties;
        // 下一代了
        generation = new Generation();
        }


        int await()

        當前線程調(diào)用await方法時會阻塞,除非遇到以下幾種情況:

        1. 所有線程都達到了屏障點,也就是parties個線程都調(diào)用了await()方法,使count遞減至0。

        2. 其他線程調(diào)用了當前線程的interrupt()方法,中斷當前線程,拋出InterruptedException而返回。

        3. 與當前屏障關(guān)聯(lián)的Generation中的broken被設(shè)置為true,拋出BrokenBarrierException而返回。

        它內(nèi)部調(diào)用了int dowait(boolean timed, long nanos),詳細解析往下面翻哈。

            public int await() throws InterruptedException, BrokenBarrierException {
        try {
        return dowait(false, 0L);
        } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
        }
        }


        int await(long timeout, TimeUnit unit)

        相比于普通的await()方法,該方法增加了超時的控制,你懂的。

        增加了一項:如果超時了,返回false。

            public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
        BrokenBarrierException,
        TimeoutException {
        return dowait(true, unit.toNanos(timeout));
        }


        int dowait(boolean timed, long nanos)

        • 第一個參數(shù)為true,說明需要超時控制。

        • 第二個參數(shù)設(shè)置超時的時間。

            private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
        // 獲取獨占鎖
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
        // 與當前屏障點關(guān)聯(lián)的Generation
        final Generation g = generation;
        // broken標志為true,則異常
        if (g.broken)
        throw new BrokenBarrierException();
        // 如果被打斷,則breakBarrier,并拋出異常
        if (Thread.interrupted()) {
        // 打破: 1 標記broken為true 2 重置count 3 喚醒await等待的線程
        breakBarrier();
        throw new InterruptedException();
        }
        int index = --count;
        // 說明已經(jīng)到達屏障點了
        if (index == 0) { // tripped
        boolean ranAction = false;
        try {
        final Runnable command = barrierCommand;
        // 執(zhí)行一下任務(wù)
        if (command != null)
        command.run();
        ranAction = true;
        // 更新: 1 喚醒await等待的線程 2 更新Generation
        nextGeneration();
        return 0;
        } finally {
        // 執(zhí)行失敗了,可能被打斷了
        if (!ranAction)
        breakBarrier();
        }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 死循環(huán), 結(jié)束的情況有:到達屏障點, broken了, 中斷, 超時
        for (;;) {
        try {
        // 超時控制
        if (!timed)
        trip.await();
        else if (nanos > 0L)
        // awaitNanos阻塞一段時間
        nanos = trip.awaitNanos(nanos);
        } catch (InterruptedException ie) {
        if (g == generation && ! g.broken) {
        // 標記broken為true
        breakBarrier();
        throw ie;
        } else {
        // We're about to finish waiting even if we had not
        // been interrupted, so this interrupt is deemed to
        // "belong" to subsequent execution.
        Thread.currentThread().interrupt();
        }
        }
        // 正常被喚醒, 再次檢查當前這一代是否已經(jīng)標記了broken
        if (g.broken)
        throw new BrokenBarrierException();
        // 最后一個線程在等待線程醒來之前,已經(jīng)通過nextGeneration將generation更新
        if (g != generation)
        return index;

        if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
        }
        }
        } finally {
        lock.unlock();
        }
        }


        以parties為N為例,我們來看看這一流程。

        • 線程調(diào)用dowait方法后,首先會獲取獨占鎖lock。如果是前N-1個線程,由于index != 0,會在條件隊列中等待trip.await() or trip.awaitNanos(nanos),會相應(yīng)釋放鎖。

        • 第N個線程調(diào)用dowait之后,此時index == 0,將會執(zhí)行命令command.run(),然后調(diào)用nextGeneration()更新?lián)Q代,同時喚醒所有條件隊列中等待的N-1個線程。

        • 第N個線程釋放鎖,后續(xù)被喚醒的線程移入AQS隊列,陸續(xù)獲取鎖,釋放鎖。

        CyclicBarrier與CountDownLatch的區(qū)別

        • CountDownLatch基于AQS,state表示計數(shù)器的值,在構(gòu)造時指定。CyclicBarrier基于ReentrantLock獨占鎖與Condition條件機制實現(xiàn)屏障邏輯。

        • CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置,可復用性能夠處理更為復雜【分段任務(wù)有序執(zhí)行】的業(yè)務(wù)場景。

        • CyclicBarrier還提供了其他有用的方法,如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量。isBroken()方法用來了解阻塞的線程是否被中斷。

        總結(jié)

        • CyclicBarrier = Cyclic + Barrier, 可重用 + 屏障,可以讓一組線程全部到達一個屏障【同步點】,再全部沖破屏障,繼續(xù)向下執(zhí)行。

        • CyclicBarrier基于ReentrantLock獨占鎖與Condition條件機制實現(xiàn)屏障邏輯。

        • CyclicBarrier需要指定parties【N】以及可選的任務(wù),當N - 1個線程調(diào)用await的時候,會在條件隊列中阻塞,直到第N個線程調(diào)用await,執(zhí)行指定的任務(wù)后,喚醒N - 1個等待的線程,并重置Generation,更新count。




        想進大廠的小伙伴請注意,

        大廠面試的套路很神奇,

        早做準備對大家更有好處,

        埋頭刷題效率低,

        看面經(jīng)會更有效率!

        小編準備了一份大廠常問面經(jīng)匯總集

        剩下的就不會給大家一展出來了,以上資料按照一下操作即可獲得


        ——將文章進行轉(zhuǎn)發(fā)評論,關(guān)注公眾號【Java烤豬皮】,關(guān)注后繼續(xù)后臺回復領(lǐng)取口令“?666?”即可免費領(lǐng)文章取中所提供的資料。




        往期精品推薦



        騰訊、阿里、滴滴后臺試題匯集總結(jié) — (含答案)

        面試:史上最全多線程序面試題!

        最新阿里內(nèi)推Java后端試題

        JVM難學?那是因為你沒有真正看完整這篇文章


        結(jié)束


        關(guān)注作者微信公眾號 —?《JAVA烤豬皮》


        了解了更多java后端架構(gòu)知識以及最新面試寶典



        看完本文記得給作者點贊+在看哦~~~大家的支持,是作者來源不斷出文的動力~

        瀏覽 50
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 最新一区二区三区 | 免费看成人女人毛片视频 | 91麻豆精品A片国产在线观看 | 草久久久久久久 | 国产精品无码永久免费不卡 |