1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        一文搞懂 CountDownLatch 用法和源碼!

        共 5693字,需瀏覽 12分鐘

         ·

        2020-12-27 02:56

        點擊藍色“Java建設者?”關注我喲

        加個“星標”,及時閱讀最新技術文章


        這是Java建設者的第130篇原創(chuàng)文章
        https://github.com/crisxuan/bestJavaer
        歡迎大佬們 star 我的 github?


        CountDownLatch?是多線程控制的一種工具,它被稱為?門閥、?計數(shù)器或者?閉鎖。這個工具經(jīng)常用來用來協(xié)調(diào)多個線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用)。下面我們就來一起認識一下 CountDownLatch

        認識 CountDownLatch

        CountDownLatch 能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續(xù)執(zhí)行。它相當于是一個計數(shù)器,這個計數(shù)器的初始值就是線程的數(shù)量,每當一個任務完成后,計數(shù)器的值就會減一,當計數(shù)器的值為 0 時,表示所有的線程都已經(jīng)任務了,然后在 CountDownLatch 上等待的線程就可以恢復執(zhí)行接下來的任務。

        CountDownLatch 的使用

        CountDownLatch 提供了一個構(gòu)造方法,你必須指定其初始值,還指定了?countDown?方法,這個方法的作用主要用來減小計數(shù)器的值,當計數(shù)器變?yōu)?0 時,在 CountDownLatch 上?await?的線程就會被喚醒,繼續(xù)執(zhí)行其他任務。當然也可以延遲喚醒,給 CountDownLatch 加一個延遲時間就可以實現(xiàn)。

        其主要方法如下

        CountDownLatch 主要有下面這幾個應用場景

        CountDownLatch 應用場景

        典型的應用場景就是當一個服務啟動時,同時會加載很多組件和服務,這時候主線程會等待組件和服務的加載。當所有的組件和服務都加載完畢后,主線程和其他線程在一起完成某個任務。

        CountDownLatch 還可以實現(xiàn)學生一起比賽跑步的程序,CountDownLatch 初始化為學生數(shù)量的線程,鳴槍后,每個學生就是一條線程,來完成各自的任務,當?shù)谝粋€學生跑完全程后,CountDownLatch 就會減一,直到所有的學生完成后,CountDownLatch 會變?yōu)?0 ,接下來再一起宣布跑步成績。

        順著這個場景,你自己就可以延伸、拓展出來很多其他任務場景。

        CountDownLatch 用法

        下面我們通過一個簡單的計數(shù)器來演示一下 CountDownLatch 的用法

        public?class?TCountDownLatch?{

        ????public?static?void?main(String[]?args)?{
        ????????CountDownLatch?latch?=?new?CountDownLatch(5);
        ????????Increment?increment?=?new?Increment(latch);
        ????????Decrement?decrement?=?new?Decrement(latch);

        ????????new?Thread(increment).start();
        ????????new?Thread(decrement).start();

        ????????try?{
        ????????????Thread.sleep(6000);
        ????????}?catch?(InterruptedException?e)?{
        ????????????e.printStackTrace();
        ????????}
        ????}
        }

        class?Decrement?implements?Runnable?{

        ????CountDownLatch?countDownLatch;

        ????public?Decrement(CountDownLatch?countDownLatch){
        ????????this.countDownLatch?=?countDownLatch;
        ????}

        ????@Override
        ????public?void?run()?{
        ????????try?{

        ????????????for(long?i?=?countDownLatch.getCount();i?>?0;i--){
        ????????????????Thread.sleep(1000);
        ????????????????System.out.println("countdown");
        ????????????????this.countDownLatch.countDown();
        ????????????}

        ????????}?catch?(InterruptedException?e)?{
        ????????????e.printStackTrace();
        ????????}
        ????}
        }


        class?Increment?implements?Runnable?{

        ????CountDownLatch?countDownLatch;

        ????public?Increment(CountDownLatch?countDownLatch){
        ????????this.countDownLatch?=?countDownLatch;
        ????}

        ????@Override
        ????public?void?run()?{
        ????????try?{
        ????????????System.out.println("await");
        ????????????countDownLatch.await();
        ????????}?catch?(InterruptedException?e)?{
        ????????????e.printStackTrace();
        ????????}
        ????????System.out.println("Waiter?Released");
        ????}
        }

        在 main 方法中我們初始化了一個計數(shù)器為 5 的 CountDownLatch,在 Decrement 方法中我們使用?countDown?執(zhí)行減一操作,然后睡眠一段時間,同時在 Increment 類中進行等待,直到 Decrement 中的線程完成計數(shù)減一的操作后,喚醒 Increment 類中的 run 方法,使其繼續(xù)執(zhí)行。

        下面我們再來通過學生賽跑這個例子來演示一下 CountDownLatch 的具體用法

        public?class?StudentRunRace?{

        ????CountDownLatch?stopLatch?=?new?CountDownLatch(1);
        ????CountDownLatch?runLatch?=?new?CountDownLatch(10);

        ????public?void?waitSignal()?throws?Exception{
        ????????System.out.println("選手"?+?Thread.currentThread().getName()?+?"正在等待裁判發(fā)布口令");
        ????????stopLatch.await();
        ????????System.out.println("選手"?+?Thread.currentThread().getName()?+?"已接受裁判口令");
        ????????Thread.sleep((long)?(Math.random()?*?10000));
        ????????System.out.println("選手"?+?Thread.currentThread().getName()?+?"到達終點");
        ????????runLatch.countDown();
        ????}

        ????public?void?waitStop()?throws?Exception{
        ????????Thread.sleep((long)?(Math.random()?*?10000));
        ????????System.out.println("裁判"+Thread.currentThread().getName()+"即將發(fā)布口令");
        ????????stopLatch.countDown();
        ????????System.out.println("裁判"+Thread.currentThread().getName()+"已發(fā)送口令,正在等待所有選手到達終點");
        ????????runLatch.await();
        ????????System.out.println("所有選手都到達終點");
        ????????System.out.println("裁判"+Thread.currentThread().getName()+"匯總成績排名");
        ????}

        ????public?static?void?main(String[]?args)?{
        ????????ExecutorService?service?=?Executors.newCachedThreadPool();
        ????????StudentRunRace?studentRunRace?=?new?StudentRunRace();
        ????????for?(int?i?=?0;?i?10;?i++)?{
        ????????????Runnable?runnable?=?()?->?{
        ????????????????try?{
        ????????????????????studentRunRace.waitSignal();
        ????????????????}?catch?(Exception?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????};
        ????????????service.execute(runnable);
        ????????}
        ????????try?{
        ????????????studentRunRace.waitStop();
        ????????}?catch?(Exception?e)?{
        ????????????e.printStackTrace();
        ????????}
        ????????service.shutdown();
        ????}
        }

        下面我們就來一起分析一下?CountDownLatch?的源碼

        CountDownLatch 源碼分析

        CountDownLatch 使用起來比較簡單,但是卻非常有用,現(xiàn)在你可以在你的工具箱中加上 CountDownLatch 這個工具類了。下面我們就來深入認識一下 CountDownLatch。

        CountDownLatch 的底層是由?AbstractQueuedSynchronizer?支持,而 AQS 的數(shù)據(jù)結(jié)構(gòu)的核心就是兩個隊列,一個是?同步隊列(sync queue),一個是條件隊列(condition queue)。

        Sync 內(nèi)部類

        CountDownLatch 在其內(nèi)部是一個 Sync ,它繼承了 AQS 抽象類。

        private?static?final?class?Sync?extends?AbstractQueuedSynchronizer?{...}

        CountDownLatch 其實其內(nèi)部只有一個?sync?屬性,并且是 final 的

        private?final?Sync?sync;

        CountDownLatch 只有一個帶參數(shù)的構(gòu)造方法

        public?CountDownLatch(int?count)?{
        ??if?(count?0)?throw?new?IllegalArgumentException("count?);
        ??this.sync?=?new?Sync(count);
        }

        也就是說,初始化的時候必須指定計數(shù)器的數(shù)量,如果數(shù)量為負會直接拋出異常。

        然后把 count 初始化為 Sync 內(nèi)部的 count,也就是

        Sync(int?count)?{
        ??setState(count);
        }

        注意這里有一個 setState(count),這是什么意思呢?見聞知意這只是一個設置狀態(tài)的操作,但是實際上不單單是,還有一層意思是 state 的值代表著待達到條件的線程數(shù)。這個我們在聊 countDown 方法的時候再討論。

        getCount()?方法的返回值是?getState()?方法,它是 AbstractQueuedSynchronizer 中的方法,這個方法會返回當前線程計數(shù),具有 volatile 讀取的內(nèi)存語義。

        //?----?CountDownLatch?----

        int?getCount()?{
        ??return?getState();
        }

        //?----?AbstractQueuedSynchronizer?----

        protected?final?int?getState()?{
        ??return?state;
        }

        tryAcquireShared()?方法用于獲取·共享狀態(tài)下對象的狀態(tài),判斷對象是否為 0 ,如果為 0 返回 1 ,表示能夠嘗試獲取,如果不為 0,那么返回 -1,表示無法獲取。

        protected?int?tryAcquireShared(int?acquires)?{
        ??return?(getState()?==?0)???1?:?-1;
        }

        //?----??getState()?方法和上面的方法相同?----

        這個?共享狀態(tài)?屬于 AQS 中的概念,在 AQS 中分為兩種模式,一種是?獨占模式,一種是?共享模式

        • tryAcquire 獨占模式,嘗試獲取資源,成功則返回 true,失敗則返回 false。

        • tryAcquireShared 共享方式,嘗試獲取資源。負數(shù)表示失??;0 表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。

        tryReleaseShared()?方法用于共享模式下的釋放

        protected?boolean?tryReleaseShared(int?releases)?{
        ??//?減小數(shù)量,變?yōu)?0?的時候進行通知。
        ??for?(;;)?{
        ????int?c?=?getState();
        ????if?(c?==?0)
        ??????return?false;
        ????int?nextc?=?c-1;
        ????if?(compareAndSetState(c,?nextc))
        ??????return?nextc?==?0;
        ??}
        }

        這個方法是一個無限循環(huán),獲取線程狀態(tài),如果線程狀態(tài)是 0 則表示沒有被線程占有,沒有占有的話那么直接返回 false ,表示已經(jīng)釋放;然后下一個狀態(tài)進行 - 1 ,使用 compareAndSetState CAS 方法進行和內(nèi)存值的比較,如果內(nèi)存值也是 1 的話,就會更新內(nèi)存值為 0 ,判斷 nextc 是否為 0 ,如果 CAS 比較不成功的話,會再次進行循環(huán)判斷。

        如果 CAS 用法不清楚的話,讀者朋友們可以參考這篇文章

        告訴你一個 AtomicInteger 的驚天大秘密!

        await 方法

        await()?方法是 CountDownLatch 一個非常重要的方法,基本上可以說只有 countDown 和 await 方法才是 CountDownLatch 的精髓所在,這個方法將會使當前線程在 CountDownLatch 計數(shù)減至零之前一直等待,除非線程被中斷。

        CountDownLatch 中的 await 方法有兩種,一種是不帶任何參數(shù)的?await(),一種是可以等待一段時間的await(long timeout, TimeUnit unit)。下面我們先來看一下 await() 方法。

        public?void?await()?throws?InterruptedException?{
        ??sync.acquireSharedInterruptibly(1);
        }

        await 方法內(nèi)部會調(diào)用 acquireSharedInterruptibly 方法,這個 acquireSharedInterruptibly 是 AQS 中的方法,以共享模式進行中斷。

        public?final?void?acquireSharedInterruptibly(int?arg)
        ??throws?InterruptedException?
        {
        ??if?(Thread.interrupted())
        ????throw?new?InterruptedException();
        ??if?(tryAcquireShared(arg)?0)
        ????doAcquireSharedInterruptibly(arg);
        }

        可以看到,acquireSharedInterruptibly 方法的內(nèi)部會首先判斷線程是否中斷,如果線程中斷,則直接拋出線程中斷異常。如果沒有中斷,那么會以共享的方式獲取。如果能夠在共享的方式下不能獲取鎖,那么就會以共享的方式斷開鏈接。

        private?void?doAcquireSharedInterruptibly(int?arg)
        ??throws?InterruptedException?
        {
        ??final?Node?node?=?addWaiter(Node.SHARED);
        ??boolean?failed?=?true;
        ??try?{
        ????for?(;;)?{
        ??????final?Node?p?=?node.predecessor();
        ??????if?(p?==?head)?{
        ????????int?r?=?tryAcquireShared(arg);
        ????????if?(r?>=?0)?{
        ??????????setHeadAndPropagate(node,?r);
        ??????????p.next?=?null;?//?help?GC
        ??????????failed?=?false;
        ??????????return;
        ????????}
        ??????}
        ??????if?(shouldParkAfterFailedAcquire(p,?node)?&&
        ??????????parkAndCheckInterrupt())
        ????????throw?new?InterruptedException();
        ????}
        ??}?finally?{
        ????if?(failed)
        ??????cancelAcquire(node);
        ??}
        }

        這個方法有些長,我們分開來看

        • 首先,會先構(gòu)造一個共享模式的 Node 入隊

        • 然后使用無限循環(huán)判斷新構(gòu)造 node 的前驅(qū)節(jié)點,如果 node 節(jié)點的前驅(qū)節(jié)點是頭節(jié)點,那么就會判斷線程的狀態(tài),這里調(diào)用了一個 setHeadAndPropagate ,其源碼如下

        private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
        ??Node?h?=?head;?
        ??setHead(node);
        ??if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
        ??????(h?=?head)?==?null?||?h.waitStatus?0)?{
        ????Node?s?=?node.next;
        ????if?(s?==?null?||?s.isShared())
        ??????doReleaseShared();
        ??}
        }

        首先會設置頭節(jié)點,然后進行一系列的判斷,獲取節(jié)點的獲取節(jié)點的后繼,以共享模式進行釋放,就會調(diào)用 doReleaseShared 方法,我們再來看一下 doReleaseShared 方法

        private?void?doReleaseShared()?{

        ??for?(;;)?{
        ????Node?h?=?head;
        ????if?(h?!=?null?&&?h?!=?tail)?{
        ??????int?ws?=?h.waitStatus;
        ??????if?(ws?==?Node.SIGNAL)?{
        ????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
        ??????????continue;????????????//?loop?to?recheck?cases
        ????????unparkSuccessor(h);
        ??????}
        ??????else?if?(ws?==?0?&&
        ???????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
        ????????continue;????????????????//?loop?on?failed?CAS
        ????}
        ????if?(h?==?head)???????????????????//?loop?if?head?changed
        ??????break;
        ??}
        }

        這個方法會以無限循環(huán)的方式首先判斷頭節(jié)點是否等于尾節(jié)點,如果頭節(jié)點等于尾節(jié)點的話,就會直接退出。如果頭節(jié)點不等于尾節(jié)點,會判斷狀態(tài)是否為 SIGNAL,不是的話就繼續(xù)循環(huán) compareAndSetWaitStatus,然后斷開后繼節(jié)點。如果狀態(tài)不是 SIGNAL,也會調(diào)用 compareAndSetWaitStatus 設置狀態(tài)為 PROPAGATE,狀態(tài)為 0 并且不成功,就會繼續(xù)循環(huán)。

        也就是說 setHeadAndPropagate 就是設置頭節(jié)點并且釋放后繼節(jié)點的一系列過程。

        • 我們來看下面的 if 判斷,也就是?shouldParkAfterFailedAcquire(p, node)?這里

        if?(shouldParkAfterFailedAcquire(p,?node)?&&
        ????parkAndCheckInterrupt())
        ??throw?new?InterruptedException();

        如果上面 Node p = node.predecessor() 獲取前驅(qū)節(jié)點不是頭節(jié)點,就會進行 park 斷開操作,判斷此時是否能夠斷開,判斷的標準如下

        private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
        ??int?ws?=?pred.waitStatus;
        ??if?(ws?==?Node.SIGNAL)
        ????return?true;
        ??if?(ws?>?0)?{
        ????do?{
        ??????node.prev?=?pred?=?pred.prev;
        ????}?while?(pred.waitStatus?>?0);
        ????pred.next?=?node;
        ??}?else?{
        ????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
        ??}
        ??return?false;
        }

        這個方法會判斷 Node p 的前驅(qū)節(jié)點的結(jié)點狀態(tài)(waitStatus),節(jié)點狀態(tài)一共有五種,分別是

        1. CANCELLED(1):表示當前結(jié)點已取消調(diào)度。當超時或被中斷(響應中斷的情況下),會觸發(fā)變更為此狀態(tài),進入該狀態(tài)后的結(jié)點將不會再變化。

        2. SIGNAL(-1):表示后繼結(jié)點在等待當前結(jié)點喚醒。后繼結(jié)點入隊時,會將前繼結(jié)點的狀態(tài)更新為 SIGNAL。

        3. CONDITION(-2):表示結(jié)點等待在 Condition 上,當其他線程調(diào)用了 Condition 的 signal() 方法后,CONDITION狀態(tài)的結(jié)點將從等待隊列轉(zhuǎn)移到同步隊列中,等待獲取同步鎖。

        4. PROPAGATE(-3):共享模式下,前繼結(jié)點不僅會喚醒其后繼結(jié)點,同時也可能會喚醒后繼的后繼結(jié)點。

        5. 0:新結(jié)點入隊時的默認狀態(tài)。

        如果前驅(qū)節(jié)點是 SIGNAL 就會返回 true 表示可以斷開,如果前驅(qū)節(jié)點的狀態(tài)大于 0 (此時為什么不用 ws == Node.CANCELLED ) 呢?因為 ws 大于 0 的條件只有 CANCELLED 狀態(tài)了。然后就是一系列的查找遍歷操作直到前驅(qū)節(jié)點的 waitStatus > 0。如果 ws <= 0 ,而且還不是 SIGNAL 狀態(tài)的話,就會使用 CAS 替換前驅(qū)節(jié)點的 ws 為 SIGNAL 狀態(tài)。

        如果檢查判斷是中斷狀態(tài)的話,就會返回 false。

        private?final?boolean?parkAndCheckInterrupt()?{
        ??LockSupport.park(this);
        ??return?Thread.interrupted();
        }

        這個方法使用?LockSupport.park?斷開連接,然后返回線程是否中斷的標志。

        • cancelAcquire()?用于取消等待隊列,如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那么取消結(jié)點在隊列中的等待。

        private?void?cancelAcquire(Node?node)?{
        ??if?(node?==?null)
        ????return;

        ??node.thread?=?null;

        ??Node?pred?=?node.prev;
        ??while?(pred.waitStatus?>?0)
        ????node.prev?=?pred?=?pred.prev;

        ??Node?predNext?=?pred.next;

        ??node.waitStatus?=?Node.CANCELLED;

        ??if?(node?==?tail?&&?compareAndSetTail(node,?pred))?{
        ????compareAndSetNext(pred,?predNext,?null);
        ??}?else?{
        ????int?ws;
        ????if?(pred?!=?head?&&
        ????????((ws?=?pred.waitStatus)?==?Node.SIGNAL?||
        ?????????(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&
        ????????pred.thread?!=?null)?{
        ??????Node?next?=?node.next;
        ??????if?(next?!=?null?&&?next.waitStatus?<=?0)
        ????????compareAndSetNext(pred,?predNext,?next);
        ????}?else?{
        ??????unparkSuccessor(node);
        ????}
        ????node.next?=?node;?//?help?GC
        ??}
        }

        所以,對 CountDownLatch 的 await 調(diào)用大致會有如下的調(diào)用過程。

        一個和 await 重載的方法是?await(long timeout, TimeUnit unit),這個方法和 await 最主要的區(qū)別就是這個方法能夠可以等待計數(shù)器一段時間再執(zhí)行后續(xù)操作。

        countDown 方法

        countDown 是和 await 同等重要的方法,countDown 用于減少計數(shù)器的數(shù)量,如果計數(shù)減為 0 的話,就會釋放所有的線程。

        public?void?countDown()?{
        ??sync.releaseShared(1);
        }

        這個方法會調(diào)用 releaseShared 方法,此方法用于共享模式下的釋放操作,首先會判斷是否能夠進行釋放,判斷的方法就是 CountDownLatch 內(nèi)部類 Sync 的 tryReleaseShared 方法

        public?final?boolean?releaseShared(int?arg)?{
        ??if?(tryReleaseShared(arg))?{
        ????doReleaseShared();
        ????return?true;
        ??}
        ??return?false;
        }

        //?----?CountDownLatch?----

        protected?boolean?tryReleaseShared(int?releases)?{
        ??for?(;;)?{
        ????int?c?=?getState();
        ????if?(c?==?0)
        ??????return?false;
        ????int?nextc?=?c-1;
        ????if?(compareAndSetState(c,?nextc))
        ??????return?nextc?==?0;
        ??}
        }

        tryReleaseShared 會進行 for 循環(huán)判斷線程狀態(tài)值,使用 CAS 不斷嘗試進行替換。

        如果能夠釋放,就會調(diào)用 doReleaseShared 方法

        private?void?doReleaseShared()?{
        ??for?(;;)?{
        ????Node?h?=?head;
        ????if?(h?!=?null?&&?h?!=?tail)?{
        ??????int?ws?=?h.waitStatus;
        ??????if?(ws?==?Node.SIGNAL)?{
        ????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
        ??????????continue;????????????//?loop?to?recheck?cases
        ????????unparkSuccessor(h);
        ??????}
        ??????else?if?(ws?==?0?&&
        ???????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
        ????????continue;????????????????//?loop?on?failed?CAS
        ????}
        ????if?(h?==?head)???????????????????//?loop?if?head?changed
        ??????break;
        ??}
        }

        可以看到,doReleaseShared 其實也是一個無限循環(huán)不斷使用 CAS 嘗試替換的操作。

        總結(jié)

        本文是 CountDownLatch 的基本使用和源碼分析,CountDownLatch 就是一個基于 AQS 的計數(shù)器,它內(nèi)部的方法都是圍繞 AQS 框架來談的,除此之外還有其他比如 ReentrantLock、Semaphore 等都是 AQS 的實現(xiàn),所以要研究并發(fā)的話,離不開對 AQS 的探討。CountDownLatch 的源碼看起來很少,比較簡單,但是其內(nèi)部比如 await 方法的調(diào)用鏈路卻很長,也值得花費時間深入研究。

        我是 cxuan,一枚技術創(chuàng)作的程序員。如果本文你覺得不錯的話,跪求讀者點贊、在看、分享!


        瀏覽 49
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91九色熟女 | 一道本无码在线播放 | 韩国三级bd高清中字电影 | 影音先锋中文字幕一区二区 | 成人三级视频在线观看视频在线 | 国产TS人妖系列高潮 | 国产女人18毛片18精品 | 欧美婷婷精品激情 | 全免费A级毛片免费看无码视频 | 啊灬啊灬啊灬快灬高潮了 |