1. CyclicBarrier 源碼一探究竟

        共 6168字,需瀏覽 13分鐘

         ·

        2021-02-09 20:10


        點(diǎn)擊上方?藍(lán)字?關(guān)注我們!



        Java,Python,C/C++,Linux,PHP,Go,C#,QT,大數(shù)據(jù),算法,軟件教程,前端,簡歷,畢業(yè)設(shè)計等分類,資源在不斷更新中... 點(diǎn)擊領(lǐng)取!

        每天 11 點(diǎn)更新文章,餓了點(diǎn)外賣,點(diǎn)擊 ??《無門檻外賣優(yōu)惠券,每天免費(fèi)領(lǐng)!》

        1、學(xué)習(xí)切入點(diǎn)

        百度翻譯大概意思就是:

        一種同步輔助程序,允許一組線程相互等待到達(dá)一個公共的屏障點(diǎn)。CyclicBarrier在涉及固定大小的線程方的程序中非常有用,這些線程方有時必須相互等待。這個屏障被稱為循環(huán)屏障,因?yàn)樗梢栽诘却木€程被釋放后重新使用。

        CyclicBarrier支持可選的Runnable命令,該命令在參與方中的最后一個線程到達(dá)后,但在釋放任何線程之前,每個屏障點(diǎn)運(yùn)行一次。此屏障操作有助于在任何參與方繼續(xù)之前更新共享狀態(tài)。

        動圖演示:

        在上文中我們分析完了 CountDownLatch源碼,可以理解為減法計數(shù)器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 來說,要簡單很多,它類似于加法計數(shù)器,在源碼中使用?ReentrantLock 和 Condition 的組合來使用。

        2、案例演示 CyclicBarrier

        //加法計數(shù)器
        public?class?CyclicBarrierDemo?{
        ????public?static?void?main(String[]?args)?{
        ????????/**
        ?????????*?集齊5名隊員,開始游戲
        ?????????*/

        ????????//?開始戰(zhàn)斗的線程
        ????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,()->{
        ????????????System.out.println("歡迎來到王者榮耀,敵軍還有五秒到達(dá)戰(zhàn)場!全軍出擊!");
        ????????});
        ????????for?(int?i?=?1;?i?<=5?;?i++)?{
        ????????????final?int?temp?=?i;
        ????????????//?lambda能操作到?i?嗎
        ????????????new?Thread(()->{
        ????????????????System.out.println(Thread.currentThread().getName()+"第"+temp+"個進(jìn)入游戲!");
        ????????????????try?{
        ????????????????????cyclicBarrier.await();?//?等待
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}?catch?(BrokenBarrierException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}).start();
        ????????}
        ????}
        }

        3、入手構(gòu)造器

        //構(gòu)造器1
        /**?創(chuàng)建一個新的CyclicBarrier,它將在給定數(shù)量的參與方(線程)等待時觸發(fā),并在觸發(fā)屏障時執(zhí)行給定的屏障操作,由最后一個進(jìn)入屏障的線程執(zhí)行?*/???
        public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
        ????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
        ????????this.parties?=?parties;
        ????????this.count?=?parties;
        ????????this.barrierCommand?=?barrierAction;
        ????}

        //構(gòu)造器2
        /**?創(chuàng)建一個新的CyclicBarrier,當(dāng)給定數(shù)量的參與方(線程)在等待它時,它將跳閘,并且在屏障跳閘時不執(zhí)行預(yù)定義的操作?*/
        public?CyclicBarrier(int?parties)?{
        ????????this(parties,?null);
        ????}

        其中構(gòu)造器1為核心構(gòu)造器,在這里你可以指定?parties?本局游戲的參與者的數(shù)量(要攔截的線程數(shù))以及?barrierAction?本局游戲結(jié)束時要執(zhí)行的任務(wù)。

        3、入手成員變量

        ???/**?同步操作鎖?*/
        ????private?final?ReentrantLock?lock?=?new?ReentrantLock();
        ????/**?線程攔截器?Condition維護(hù)了一個阻塞隊列*/
        ????private?final?Condition?trip?=?lock.newCondition();
        ????/**?每次攔截的線程數(shù)?*/
        ????private?final?int?parties;
        ????/*?換代前執(zhí)行的任務(wù)?*/
        ????private?final?Runnable?barrierCommand;
        ????/**?表示柵欄的當(dāng)前代?類似代表本局游戲*/
        ????private?Generation?generation?=?new?Generation();
        ????/**?計數(shù)器?*/
        ????private?int?count;
        ????/**?靜態(tài)內(nèi)部類Generation??*/
        ????private?static?class?Generation?{
        ????????boolean?broken?=?false;
        ????}

        3、入手核心方法

        3.1、【await】方法源碼分析

        下面分析這兩個方法,分別為【非定時等待】和【定時等待】!

        //非定時等待
        public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
        ????????try?{
        ????????????return?dowait(false,?0L);
        ????????}?catch?(TimeoutException?toe)?{
        ????????????throw?new?Error(toe);?//?cannot?happen
        ????????}
        ????}
        //定時等待
        public?int?await(long?timeout,?TimeUnit?unit)throws?InterruptedException,
        ??????????????BrokenBarrierException,
        ??????????????TimeoutException?
        {
        ???????return?dowait(true,?unit.toNanos(timeout));
        ???}

        可以看到,最終兩個方法都走【dowait】 方法,只不過參數(shù)不同。下面我們重點(diǎn)看看這個方法到底做了哪些事情。

        //核心等待方法
        ?private?int?dowait(boolean?timed,?long?nanos)
        ????????throws?InterruptedException,?BrokenBarrierException,
        ???????????????TimeoutException?
        {
        ????????final?ReentrantLock?lock?=?this.lock;
        ????????lock.lock();//加鎖操作
        ????????try?{
        ????????????final?Generation?g?=?generation;
        ????????????//檢查當(dāng)前柵欄是否被打翻
        ????????????if?(g.broken)
        ????????????????throw?new?BrokenBarrierException();
        ????????????//檢查當(dāng)前線程是否被中斷
        ????????????if?(Thread.interrupted())?{
        ????????????????breakBarrier();
        ????????????????throw?new?InterruptedException();
        ????????????}
        ????????????//每次都將計數(shù)器的值-1
        ????????????int?index?=?--count;
        ????????????//計數(shù)器的值減為0,則需要喚醒所有線程并轉(zhuǎn)換到下一代
        ????????????if?(index?==?0)?{??//?tripped
        ????????????????boolean?ranAction?=?false;
        ????????????????try?{
        ????????????????????//喚醒所有線程前先執(zhí)行指定的任務(wù)
        ????????????????????final?Runnable?command?=?barrierCommand;
        ????????????????????if?(command?!=?null)
        ????????????????????????command.run();
        ????????????????????ranAction?=?true;
        ????????????????????//喚醒所有線程并轉(zhuǎn)換到下一代
        ????????????????????nextGeneration();
        ????????????????????return?0;
        ????????????????}?finally?{
        ????????????????????//確保在任務(wù)未成功執(zhí)行時能將所有線程喚醒
        ????????????????????if?(!ranAction)
        ????????????????????????breakBarrier();
        ????????????????}
        ????????????}
        ????????????//如果計數(shù)器不為0?則執(zhí)行此循環(huán)
        ????????????//?loop?until?tripped,?broken,?interrupted,?or?timed?out
        ????????????for?(;;)?{
        ????????????????try?{
        ????????????????????//根據(jù)傳入的參數(shù)來覺得是定時等待還是非定時等待
        ????????????????????if?(!timed)
        ????????????????????????//如果沒有時間限制,則直接等待,直到被喚醒
        ????????????????????????trip.await();
        ????????????????????else?if?(nanos?>?0L)
        ????????????????????????//如果有時間限制,則等待指定時間
        ????????????????????????nanos?=?trip.awaitNanos(nanos);
        ????????????????}?catch?(InterruptedException?ie)?{
        ????????????????????//若當(dāng)前線程在等待期間被中斷則打翻柵欄喚醒其它線程
        ????????????????????if?(g?==?generation?&&?!?g.broken)?{
        ????????????????????????breakBarrier();
        ????????????????????????throw?ie;
        ????????????????????}?else?{
        ????????????????????????//?若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,則直接調(diào)用中斷操作
        ????????????????????????Thread.currentThread().interrupt();
        ????????????????????}
        ????????????????}
        ????????????????//如果線程因?yàn)榇蚍瓥艡诓僮鞫粏拘褎t拋出異常
        ????????????????if?(g.broken)
        ????????????????????throw?new?BrokenBarrierException();
        ????????????????//如果線程因?yàn)閾Q代操作而被喚醒則返回計數(shù)器的值
        ????????????????if?(g?!=?generation)
        ????????????????????return?index;
        ????????????????//如果線程因?yàn)闀r間到了而被喚醒則打翻柵欄并拋出異常
        ????????????????if?(timed?&&?nanos?<=?0L)?{
        ????????????????????breakBarrier();
        ????????????????????throw?new?TimeoutException();
        ????????????????}
        ????????????}
        ????????}?finally?{
        ????????????lock.unlock();//最終解鎖
        ????????}
        ????}

        分兩步分析,首先計數(shù)器的值減為0的情況,和計數(shù)器不為0的情況,首先第一種情況下:

        img

        第二種情況,計數(shù)器不為0,則進(jìn)入自旋for(;;):

        多線程同時并發(fā)訪問,如何阻塞當(dāng)前線程?

        我們翻看源碼,這里就看一下沒有時間限制的【trip.await】方法:

        整個await的過程:

        1、將當(dāng)前線程加入到Condition鎖隊列中。特別主要要區(qū)分AQS的等待隊列,這里進(jìn)入的是Condition的FIFO隊列

        2、釋放鎖。這里可以看到【fullyRelease】將鎖釋放了,否則【acquireQueued(node, savedState)】別的線程就無法拿到鎖而發(fā)生死鎖。

        3、自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。

        4、獲取鎖【acquireQueued】方法,并將自己從Condition的FIFO隊列中釋放,表面自己不再需要鎖(我已經(jīng)有鎖了)

        3.2、Condition 隊列與AQS等待隊列 補(bǔ)充

        AQS等待隊列與Condition隊列是兩個相互獨(dú)立的隊列,【await】就是在當(dāng)前線程持有鎖的基礎(chǔ)上釋放鎖資源,并新建Condition節(jié)點(diǎn)加入到Condition隊列尾部,阻塞當(dāng)前線程?!緎ignal】就是將當(dāng)前Condition的頭結(jié)點(diǎn)移動到AQS等待隊列節(jié)點(diǎn)尾部,讓其等待再次獲取鎖。下面畫圖演示區(qū)別:

        節(jié)點(diǎn)1執(zhí)行Condition.await()->(1)將head后移 ->(2)釋放節(jié)點(diǎn)1的鎖并從AQS等待隊列中移除->(3)將節(jié)點(diǎn)1加入到Condition的等待隊列中->(4)更新lastWrite為節(jié)點(diǎn)1

        節(jié)點(diǎn)2執(zhí)行signal()操作->(1)將firstWrite后移->(2)將節(jié)點(diǎn)4移出Condition隊列->(3)將節(jié)點(diǎn)4加入到AQS的等待隊列中去->(4)更新AQS等待隊列的tail

        3.3、總結(jié):

        一、Condition的數(shù)據(jù)結(jié)構(gòu):

        我們知道一個Condition可以在多個地方被await(),那么就需要一個FIFO的結(jié)構(gòu)將這些Condition串聯(lián)起來,然后根據(jù)需要喚醒一個或者多個(通常是所有)。所以在Condition內(nèi)部就需要一個FIFO的隊列。private transient Node firstWaiter; private transient Node lastWaiter;上面的兩個節(jié)點(diǎn)就是描述一個FIFO的隊列。我們再結(jié)合前面提到的節(jié)點(diǎn)(Node)數(shù)據(jù)結(jié)構(gòu)。我們就發(fā)現(xiàn)Node.nextWaiter就派上用場了!nextWaiter就是將一系列的Condition.await 串聯(lián)起來組成一個FIFO的隊列。

        二、線程何時阻塞和釋放

        阻塞:await()方法中,在線程釋放鎖資源之后,如果節(jié)點(diǎn)不在AQS等待隊列,則阻塞當(dāng)前線程,如果在等待隊列,則自旋等待嘗試獲取鎖 釋放:signal()后,節(jié)點(diǎn)會從condition隊列移動到AQS等待隊列,則進(jìn)入正常鎖的獲取流程。

        3.4、【signalAll】signalAll源碼分析

        signalAll】方法,喚醒所有在Condition阻塞隊列中的線程

        private?void?breakBarrier()?{
        ????????generation.broken?=?true;
        ????????count?=?parties;
        ????????trip.signalAll();//喚醒Condition中等待的線程
        ????}

        public?final?void?signalAll()?{
        ????????????if?(!isHeldExclusively())
        ????????????????throw?new?IllegalMonitorStateException();
        ????????????Node?first?=?firstWaiter;
        ????????????if?(first?!=?null)
        ????????????????doSignalAll(first);
        ?????}
        /**?這個方法相當(dāng)于把Condition隊列中的所有Node全部取出插入到等待隊列中去?*/
        private?void?doSignalAll(Node?first)?{
        ????????????lastWaiter?=?firstWaiter?=?null;
        ????????????do?{
        ????????????????Node?next?=?first.nextWaiter;
        ????????????????first.nextWaiter?=?null;
        ????????????????transferForSignal(first);
        ????????????????first?=?next;
        ????????????}?while?(first?!=?null);
        ???????}
        /**?將節(jié)點(diǎn)從條件隊列傳輸?shù)酵疥犃蠥QS的等待隊列中?*/
        final?boolean?transferForSignal(Node?node)?{
        ????????//核心添加節(jié)點(diǎn)到AQS隊列方法
        ????????Node?p?=?enq(node);
        ????????int?ws?=?p.waitStatus;
        ????????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
        ????????????LockSupport.unpark(node.thread);
        ????????return?true;
        ????}
        /**?使用CAS+自旋方式插入節(jié)點(diǎn)到等待隊列,如果隊列為空,則初始化隊列?*/
        private?Node?enq(final?Node?node)?{
        ????????for?(;;)?{
        ????????????Node?t?=?tail;
        ????????????if?(t?==?null)?{?//?Must?initialize
        ????????????????if?(compareAndSetHead(new?Node()))
        ????????????????????tail?=?head;
        ????????????}?else?{
        ????????????????node.prev?=?t;
        ????????????????if?(compareAndSetTail(t,?node))?{
        ????????????????????t.next?=?node;
        ????????????????????return?t;
        ????????????????}
        ????????????}
        ????????}

        3.5、【reset】方法源碼分析

        最后,我們來看看怎么重置一個柵欄:

        將屏障重置為初始狀態(tài)。如果任何一方目前在隔離墻等候,他們將帶著BrokenBarrierException返回。請注意,由于其他原因發(fā)生中斷后的重置可能很復(fù)雜;線程需要以其他方式重新同步,并選擇一種方式執(zhí)行重置。最好是創(chuàng)建一個新的屏障供以后使用

        ????public?void?reset()?{
        ????????final?ReentrantLock?lock?=?this.lock;
        ????????lock.lock();
        ????????try?{
        ????????????breakBarrier();???//?break?the?current?generation
        ????????????nextGeneration();?//?start?a?new?generation
        ????????}?finally?{
        ????????????lock.unlock();
        ????????}
        ????}

        測試reset代碼:

        首先,打破柵欄,那意味著所有等待的線程(5個等待的線程)會喚醒,【await 】方法會通過拋出【BrokenBarrierException】異常返回。然后開啟新一代,重置了 count 和 generation,相當(dāng)于一切歸0了。

        4、CyclicBarrier 與 CountDownLatch 的區(qū)別

        相同點(diǎn):

        1、都可以實(shí)現(xiàn)一組線程在到達(dá)某個條件之前進(jìn)行等待

        2、它們內(nèi)部都有一個計數(shù)器,當(dāng)計數(shù)器的值不斷減為0的時候,所有阻塞的線程都會被喚醒!

        不同點(diǎn):

        1、CyclicBarrier 的計數(shù)器是由它自己來控制,而CountDownLatch 的計數(shù)器則是由使用則來控制

        2、在CyclicBarrier 中線程調(diào)用 await方法不僅會將自己阻塞,還會將計數(shù)器減1,而在CountDownLatch中線程調(diào)用 await方法只是將自己阻塞而不會減少計數(shù)器的值。

        3、另外,CountDownLatch 只能攔截一輪,而CyclicBarrier 可以實(shí)現(xiàn)循環(huán)攔截。一般來說CyclicBarrier 可以實(shí)現(xiàn) CountDownLatch的功能,而反之不能。

        5、總結(jié):

        當(dāng)調(diào)用【cyclicBarrier.await】方法時,最終都會執(zhí)行【dowait】方法,使用了ReentrantLock去上鎖,每次講計數(shù)器count值-1,當(dāng)計數(shù)器值-1為0的時候,會先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進(jìn)入下一代

        如果當(dāng)前計數(shù)器值-1不為0的時候,進(jìn)入自旋,執(zhí)行Condition的【await()】方法,將當(dāng)前線程添加到Condition的條件隊列中等待,執(zhí)行【fullyRelease】調(diào)用【tryRelease】將count值-1,再判斷count值是否為0,為0 則會先執(zhí)行指定任務(wù),調(diào)用Condition的【trip.signalAll()】喚醒所有線程并進(jìn)入下一代,再判斷是否在AQS等待隊列中,如果不在的話就park當(dāng)前線程進(jìn)入AQS等待隊列中,否則自旋直到被喚醒在Condition中的等待隊列被signalAll進(jìn)入AQS等待隊列中獲取鎖





        往期推薦

        面了個女程序媛,也就問了5個問題,但她好像被我虐了

        Jar 包依賴沖突排查思路和解決方法

        領(lǐng)個紅包給自己加個餐!

        華為員工工資表曝光,我心動了,牛逼的人是天生的?


        看完文章,餓了點(diǎn)外賣,點(diǎn)擊 ??《無門檻外賣優(yōu)惠券,每天免費(fèi)領(lǐng)!》

        END



        若覺得文章對你有幫助,隨手轉(zhuǎn)發(fā)分享,也是我們繼續(xù)更新的動力。


        長按二維碼,掃掃關(guān)注哦

        ?「C語言中文網(wǎng)」官方公眾號,關(guān)注手機(jī)閱讀教程??


        必備編程學(xué)習(xí)資料


        目前收集的資料包括:?Java,Python,C/C++,Linux,PHP,go,C#,QT,git/svn,人工智能,大數(shù)據(jù),單片機(jī),算法,小程序,易語言,安卓,ios,PPT,軟件教程,前端,軟件測試,簡歷,畢業(yè)設(shè)計,公開課?等分類,資源在不斷更新中...


        點(diǎn)擊“閱讀原文”,立即免費(fèi)領(lǐng)取最新資料!
        ??????
        瀏覽 40
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
          
          

            1. 欧美成人精品一二三区欧美风情 | 久久影院一二三区 | 日本综合视频 | 激情全身裸吻胸 | 91嫩草国产露脸精品国产 |