1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        面試官問我AQS中的PROPAGATE有什么用?

        共 1840字,需瀏覽 4分鐘

         ·

        2020-12-04 20:01

        ?寫給自己看,說給別人聽。你好,這是think123的第81篇原創(chuàng)文章



        之前分析過AQS的源碼,但只分析了獨占鎖的原理。

        而剛好我們可以借助Semaphore來分析共享鎖。

        如何使用Semaphore

        public?class?SemaphoreDemo?{

        ??public?static?void?main(String[]?args)?{

        ????//?申請共享鎖數(shù)量
        ????Semaphore?sp?=?new?Semaphore(3);

        ????for(int?i?=?0;?i?5;?i++)?{
        ??????new?Thread(()?->?{
        ????????try?{

        ??????????//?獲取共享鎖
        ??????????sp.acquire();

        ??????????String?threadName?=?Thread.currentThread().getName();

        ??????????//?訪問API
        ??????????System.out.println(threadName?+?"?獲取許可,訪問API。剩余許可數(shù)?"?+?sp.availablePermits());

        ??????????TimeUnit.SECONDS.sleep(1);
        ??????????
        ??????????//?釋放共享鎖
        ??????????sp.release();

        ??????????System.out.println(threadName?+?"?釋放許可,當前可用許可數(shù)為?"?+?sp.availablePermits());

        ????????}?catch?(InterruptedException?e)?{
        ????????????e.printStackTrace();
        ????????}

        ??????},?"thread-"?+?(i+1)).start();
        ????}
        ??}
        }

        Java SDK 里面提供了 Lock,為啥還要提供一個 Semaphore ?其實實現(xiàn)一個互斥鎖,僅僅是 Semaphore 的部分功能,Semaphore 還有一個功能是 Lock 不容易實現(xiàn)的,那就是:Semaphore 可以允許多個線程訪問一個臨界區(qū)。

        比較常見的需求就是我們工作中遇到的連接池、對象池、線程池等等池化資源。其中,你可能最熟悉數(shù)據庫連接池,在同一時刻,一定是允許多個線程同時使用連接池的,當然,每個連接在被釋放前,是不允許其他線程使用的。

        比如上面的代碼就演示了同時最多只允許3個線程訪問API。

        如何依托AQS實現(xiàn)Semaphore

        abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
        ??Sync(int?permits)?{
        ????setState(permits);
        ??}

        ??//?獲取鎖
        ??final?int?nonfairTryAcquireShared(int?acquires)?{
        ????for?(;;)?{
        ??????int?available?=?getState();
        ??????int?remaining?=?available?-?acquires;
        ??????if?(remaining?0?||
        ??????????compareAndSetState(available,?remaining))
        ??????????return?remaining;
        ????}
        ??}

        ??//?釋放鎖
        ??protected?final?boolean?tryReleaseShared(int?releases)?{
        ??????for?(;;)?{
        ????????int?current?=?getState();
        ????????int?next?=?current?+?releases;
        ????????if?(next?????????????throw?new?Error("Maximum?permit?count?exceeded");
        ????????if?(compareAndSetState(current,?next))
        ????????????return?true;
        ??????}
        ??}

        }

        Semaphore的acquire/release還是使用到了AQS,它把state的值作為共享資源的數(shù)量。獲取鎖的時候state的值減去1,釋放鎖的時候state的值加上1。

        鎖的獲取

        public?void?acquire()?throws?InterruptedException?{
        ??sync.acquireSharedInterruptibly(1);
        }

        //?AQS
        public?final?void?acquireSharedInterruptibly(int?arg)
        ????throws?InterruptedException?
        {
        ??if?(Thread.interrupted())
        ????throw?new?InterruptedException();
        ??if?(tryAcquireShared(arg)?0)
        ????doAcquireSharedInterruptibly(arg);
        }

        Semaphore也有公平鎖和非公平鎖兩種實現(xiàn),不過都是借助于AQS的,這里默認實現(xiàn)是非公平鎖,所以最終會調用nonfairTryAcquireShared方法。

        鎖的釋放

        public?void?release()?{
        ??sync.releaseShared(1);
        }

        //?AQS
        public?final?boolean?releaseShared(int?arg)?{
        ??if?(tryReleaseShared(arg))?{
        ????doReleaseShared();
        ????return?true;
        ??}
        ??return?false;
        }

        鎖的釋放成功后,會調用doReleaseShared(),這個方法后面會分析。

        獲取鎖失敗

        當獲取鎖失敗后,新的線程就會被加入隊列

        ?public?final?void?acquireSharedInterruptibly(int?arg)
        ????????????throws?InterruptedException?
        {
        ??if?(Thread.interrupted())
        ??????throw?new?InterruptedException();
        ??//?實際調用的是NonFairSync中的nonfairTryAcquireShared
        ??if?(tryAcquireShared(arg)?0)
        ??????doAcquireSharedInterruptibly(arg);
        }

        當鎖的數(shù)量小于0的時候,需要入隊。

        共享鎖調用的方法doAcquireSharedInterruptibly()和獨占鎖調用的方法acquireQueued()只有一些細微的區(qū)別。

        區(qū)別

        首先獨占鎖構造的節(jié)點是模式是EXCLUSIVE,而共享鎖構造模式是SHARED,它們使用的是AQS中的nextWaiter變量來區(qū)分的。

        其次在在準備入隊的時候,如果嘗試獲取共享鎖成功,那么會調用setHeadAndPropagate()方法,重新設置頭節(jié)點并決定是否需要喚醒后繼節(jié)點

        private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
        ??//?舊的頭節(jié)點
        ??Node?h?=?head;
        ??//?將當前獲取到鎖的節(jié)點設置為頭節(jié)點
        ??setHead(node);

        ??//?如果仍然有多的鎖(propagate的值是nonfairTryAcquireShared()返回值)
        ??//?或者舊的頭結點為空,或者頭結點的?ws?小于0
        ??//?又或者新的頭結點為空,或者新頭結點的?ws?小于0,則喚醒后繼節(jié)點
        ??if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
        ??????(h?=?head)?==?null?||?h.waitStatus?0)?{
        ??????Node?s?=?node.next;
        ??????if?(s?==?null?||?s.isShared())
        ??????????doReleaseShared();
        ??}
        }

        private?void?setHead(Node?node)?{
        ??head?=?node;
        ??node.thread?=?null;
        ??node.prev?=?null;
        }

        private?void?doReleaseShared()?{????
        ??for?(;;)?{
        ??????Node?h?=?head;
        ??????//?保證同步隊列中至少有兩個節(jié)點
        ??????if?(h?!=?null?&&?h?!=?tail)?{
        ??????????int?ws?=?h.waitStatus;

        ??????????//?需要喚醒后繼節(jié)點
        ??????????if?(ws?==?Node.SIGNAL)?{
        ??????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
        ??????????????????continue;????????????//?loop?to?recheck?cases
        ??????????????unparkSuccessor(h);
        ??????????}
        ??????????//?將節(jié)點狀態(tài)更新為PROPAGATE
        ??????????else?if?(ws?==?0?&&
        ???????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
        ??????????????continue;????????????????//?loop?on?failed?CAS
        ??????}
        ??????if?(h?==?head)???????????????????//?loop?if?head?changed
        ??????????break;
        ??}
        }

        其實看到這里就有一些邏輯我看不懂了,比如setHeadAndPropagate()方法中的這一段邏輯

        if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
        ??????(h?=?head)?==?null?||?h.waitStatus?0)?{

        寫成這樣不好嗎?

        if(propagate?>?0??&&?h.waitStatus?0)

        為什么要那么復雜呢?而且看上去這樣也可以嘛,我本來想要假裝看懂的,可是我發(fā)現(xiàn)騙自己真的不容易呀。

        這里應該是有什么特殊的原因,不然Doug Lea老爺子不會這么寫。。。。。。

        PROPAGATE狀態(tài)有什么用?

        我就去網上搜索了下,結果在Java的Bug列表中發(fā)現(xiàn)是因為有一個bug才這樣修改的

        https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020

        JDK Bug

        看到這個bug在2011年就在JDK6中被修復了,我想說那個時候我還不知道java是啥呢。。。。

        這個修改可以在Doug Lead老爺子的主頁中找到,通過JSR 166找到可對比的CSV,對比1.73和1.74兩個版本

        http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java?r1=1.73&r2=1.74

        我們先看看setHeadAndPropagate中的修改對比

        對比

        以前的版本中判斷條件是這樣的

        if?(propagate?>?0?&&?node.waitStatus?!=?0)

        這樣的判斷很符合我的認知嘛。但是會造成怎樣的問題呢?按照bug的描述,我們來紙上談兵下沒有PROPAGATE狀態(tài)的時候會出什么問題。

        首先 Semaphore初始化state值為0,然后4個線程分別運行4個任務。線程t1,t2同時獲取鎖,另外兩個線程t3,t3同時釋放鎖

        public?class?TestSemaphore?{

        ??//?這里將信號量設置成了0
        ??private?static?Semaphore?sem?=?new?Semaphore(0);

        ??private?static?class?Thread1?extends?Thread?{
        ????@Override
        ????public?void?run()?{
        ??????//?獲取鎖
        ??????sem.acquireUninterruptibly();
        ????}
        ??}

        ??private?static?class?Thread2?extends?Thread?{
        ????@Override
        ????public?void?run()?{
        ??????//?釋放鎖
        ??????sem.release();
        ????}
        ??}

        ??public?static?void?main(String[]?args)?throws?InterruptedException?{
        ????for?(int?i?=?0;?i?10000000;?i++)?{
        ??????Thread?t1?=?new?Thread1();
        ??????Thread?t2?=?new?Thread1();
        ??????Thread?t3?=?new?Thread2();
        ??????Thread?t4?=?new?Thread2();
        ??????t1.start();
        ??????t2.start();
        ??????t3.start();
        ??????t4.start();
        ??????t1.join();
        ??????t2.join();
        ??????t3.join();
        ??????t4.join();
        ??????System.out.println(i);
        ????}
        ??}
        }

        根據上面的代碼,我們將信號量設置為0,所以t1,t2獲取鎖會失敗。

        假設某次循環(huán)中隊列中的情況如下

        head?-->?t1?-->?t2(tail)

        鎖的釋放由t3先釋放,t4后釋放

        時刻1: 線程t3調用releaseShared(),然后喚醒隊列中節(jié)點(線程t1),此時head的狀態(tài)從-1變成0

        時刻2: 線程t1由于線程t3釋放了鎖,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0

        再次獲取鎖

        時刻3: 線程t4調用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),不滿足條件,因此不喚醒后繼節(jié)點

        diff

        時刻4: ?線程t1獲取鎖成功,調用setHeadAndPropagate(),因為不滿足propagate > 0(時刻2中propagate == 0),從而不會喚醒后繼節(jié)點

        如果沒有PROPAGATE狀態(tài),上面的情況就會導致線程t2不會被喚醒。

        那在引入了propagate之后這個變量又會是怎樣的情況呢?

        時刻1: 線程t3調用doReleaseShared,然后喚醒隊列中結點(線程t1),此時head的狀態(tài)從-1變成0

        時刻2: 線程t1由于t3釋放了信號量,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0

        時刻3: 線程t4調用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),將節(jié)點狀態(tài)設置為PROPAGATE(值為-3)

        ?else?if?(ws?==?0?&&
        ????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
        ???continue;?????//?loop?on?failed?CAS

        時刻4: 線程t1獲取鎖成功,調用setHeadAndPropagate(),雖然不滿足propagate > 0(時刻2中propagate == 0),但是waitStatus<0,所以會去喚醒后繼節(jié)點

        至此我們知道了PROPAGATE的作用,就是為了避免線程無法會喚醒的窘境。,因為共享鎖會有很多線程獲取到鎖或者釋放鎖,所以有些方法是并發(fā)執(zhí)行的,就會產生很多中間狀態(tài),而PROPAGATE就是為了讓這些中間狀態(tài)不影響程序的正常運行。

        doReleaseShared-小方法大智慧

        無論是釋放鎖還是申請到鎖都會調用doReleaseShared()方法,這個方法看似簡單,其實里面的邏輯還是很精妙的。

        private?void?doReleaseShared()?{????
        ??for?(;;)?{
        ????Node?h?=?head;
        ????//?保證同步隊列中至少有兩個節(jié)點
        ????if?(h?!=?null?&&?h?!=?tail)?{
        ??????int?ws?=?h.waitStatus;

        ??????//?需要喚醒后繼節(jié)點
        ??????if?(ws?==?Node.SIGNAL)?{
        ??????????//?可能有其他線程調用doReleaseShared(),unpark操作只需要其中一個調用就行了
        ??????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
        ??????????????continue;?????//?loop?to?recheck?cases
        ??????????unparkSuccessor(h);
        ??????}
        ??????//?將節(jié)點狀態(tài)設置為PROPAGATE(畫重點了)
        ??????else?if?(ws?==?0?&&
        ????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
        ??????????continue;????//?loop?on?failed?CAS
        ????}
        ????if?(h?==?head)????????//?loop?if?head?changed
        ??????break;
        ??}
        }

        這其中有一個判斷條件

        ws?==?0?&&!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE)

        這個if條件成立也巧妙

        1. 首先隊列中現(xiàn)在至少有兩個節(jié)點,簡化分析,我們認為它只有兩個節(jié)點,head --> node
        2. 執(zhí)行到else if,說明跳過了前面的if條件,說明頭結點是剛成為頭結點的,它的waitStatus為0,尾節(jié)點是在這之后加入的,發(fā)生這種情況是shouldParkAfterFailedAcquire()中還沒來得及將前一個節(jié)點的ws值修改為SIGNAL
        3. CAS失敗說明此時頭結點的ws不為0了,也就表明shouldParkAfterFailedAcquire()已經將前驅節(jié)點的waitStatus值修改為了SIGNAL了
        更新前一個節(jié)點狀態(tài)

        而整個循環(huán)的退出條件是在h==head的時候,這個是為什么呢?

        由于我們的head節(jié)點是一個虛擬節(jié)點(也可以叫做哨兵節(jié)點),假設我們的同步隊列中節(jié)點順序如下:

        head --> A --> B --> C

        現(xiàn)在假設A拿到了共享鎖,那么它將成為新的dummy node(虛擬節(jié)點),

        head(A) --> B --> C

        此時A線程會調用doReleaseShared方法喚醒后繼節(jié)點B,它很快就獲取到了鎖,并成為了新的頭節(jié)點

        head(B) --> C

        此時B線程也會調用該方法,并喚醒其后繼節(jié)點C,但是在B線程調用的時候,線程A可能還沒有運行結束,也正在執(zhí)行這個方法, 當它執(zhí)行到h==head的時候發(fā)現(xiàn)head改變了,所以for循環(huán)就不會退出,又會繼續(xù)執(zhí)行for循環(huán),喚醒后繼節(jié)點。

        至此我們共享鎖分析完畢,其實只要弄明白了AQS的邏輯,依賴于AQS實現(xiàn)的Semaphore就很簡單了。

        在看共享鎖源碼過程中尤其需要注意的是方法是會被多個線程并發(fā)執(zhí)行的,所以其中很多判斷是多線程競爭情況下才會出現(xiàn)的。同時需要注意的是共享鎖并不能保證線程安全,仍然需要程序員自己保證對共享資源的操作是安全的。





        瀏覽 91
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            欧美一级电影在线观看 | 久草热在线观看 | 狠狠的澡狠狠的干 | 安去也俺来了在线观看免费 | 婷婷五月中文字幕 | 欧美夜| 日韩黄色电影 | 蜜桃性爱视频 | 成人免费视频网站 | 国产色色网 |