面試官問我AQS中的PROPAGATE有什么用?
“?寫給自己看,說給別人聽。你好,這是think123的第81篇原創(chuàng)文章”
之前分析過AQS的源碼,但只分析了獨占鎖的原理。
而剛好我們可以借助Semaphore來分析共享鎖。
如何使用Semaphore
public?class?SemaphoreDemo?{
??public?static?void?main(String[]?args)?{
????//?申請共享鎖數(shù)量
????Semaphore?sp?=?new?Semaphore(3);
????for(int?i?=?0;?i?5;?i++)?{
??????new?Thread(()?->?{
????????try?{
??????????//?獲取共享鎖
??????????sp.acquire();
??????????String?threadName?=?Thread.currentThread().getName();
??????????//?訪問API
??????????System.out.println(threadName?+?"?獲取許可,訪問API。剩余許可數(shù)?"?+?sp.availablePermits());
??????????TimeUnit.SECONDS.sleep(1);
??????????
??????????//?釋放共享鎖
??????????sp.release();
??????????System.out.println(threadName?+?"?釋放許可,當前可用許可數(shù)為?"?+?sp.availablePermits());
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
??????},?"thread-"?+?(i+1)).start();
????}
??}
}
Java SDK 里面提供了 Lock,為啥還要提供一個 Semaphore ?其實實現(xiàn)一個互斥鎖,僅僅是 Semaphore 的部分功能,Semaphore 還有一個功能是 Lock 不容易實現(xiàn)的,那就是:Semaphore 可以允許多個線程訪問一個臨界區(qū)。
比較常見的需求就是我們工作中遇到的連接池、對象池、線程池等等池化資源。其中,你可能最熟悉數(shù)據庫連接池,在同一時刻,一定是允許多個線程同時使用連接池的,當然,每個連接在被釋放前,是不允許其他線程使用的。
比如上面的代碼就演示了同時最多只允許3個線程訪問API。
如何依托AQS實現(xiàn)Semaphore
abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
??Sync(int?permits)?{
????setState(permits);
??}
??//?獲取鎖
??final?int?nonfairTryAcquireShared(int?acquires)?{
????for?(;;)?{
??????int?available?=?getState();
??????int?remaining?=?available?-?acquires;
??????if?(remaining?0?||
??????????compareAndSetState(available,?remaining))
??????????return?remaining;
????}
??}
??//?釋放鎖
??protected?final?boolean?tryReleaseShared(int?releases)?{
??????for?(;;)?{
????????int?current?=?getState();
????????int?next?=?current?+?releases;
????????if?(next?????????????throw?new?Error("Maximum?permit?count?exceeded");
????????if?(compareAndSetState(current,?next))
????????????return?true;
??????}
??}
}
Semaphore的acquire/release還是使用到了AQS,它把state的值作為共享資源的數(shù)量。獲取鎖的時候state的值減去1,釋放鎖的時候state的值加上1。
鎖的獲取
public?void?acquire()?throws?InterruptedException?{
??sync.acquireSharedInterruptibly(1);
}
//?AQS
public?final?void?acquireSharedInterruptibly(int?arg)
????throws?InterruptedException?{
??if?(Thread.interrupted())
????throw?new?InterruptedException();
??if?(tryAcquireShared(arg)?0)
????doAcquireSharedInterruptibly(arg);
}
Semaphore也有公平鎖和非公平鎖兩種實現(xiàn),不過都是借助于AQS的,這里默認實現(xiàn)是非公平鎖,所以最終會調用nonfairTryAcquireShared方法。
鎖的釋放
public?void?release()?{
??sync.releaseShared(1);
}
//?AQS
public?final?boolean?releaseShared(int?arg)?{
??if?(tryReleaseShared(arg))?{
????doReleaseShared();
????return?true;
??}
??return?false;
}
鎖的釋放成功后,會調用doReleaseShared(),這個方法后面會分析。
獲取鎖失敗
當獲取鎖失敗后,新的線程就會被加入隊列
?public?final?void?acquireSharedInterruptibly(int?arg)
????????????throws?InterruptedException?{
??if?(Thread.interrupted())
??????throw?new?InterruptedException();
??//?實際調用的是NonFairSync中的nonfairTryAcquireShared
??if?(tryAcquireShared(arg)?0)
??????doAcquireSharedInterruptibly(arg);
}
當鎖的數(shù)量小于0的時候,需要入隊。
共享鎖調用的方法doAcquireSharedInterruptibly()和獨占鎖調用的方法acquireQueued()只有一些細微的區(qū)別。

首先獨占鎖構造的節(jié)點是模式是EXCLUSIVE,而共享鎖構造模式是SHARED,它們使用的是AQS中的nextWaiter變量來區(qū)分的。
其次在在準備入隊的時候,如果嘗試獲取共享鎖成功,那么會調用setHeadAndPropagate()方法,重新設置頭節(jié)點并決定是否需要喚醒后繼節(jié)點
private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
??//?舊的頭節(jié)點
??Node?h?=?head;
??//?將當前獲取到鎖的節(jié)點設置為頭節(jié)點
??setHead(node);
??//?如果仍然有多的鎖(propagate的值是nonfairTryAcquireShared()返回值)
??//?或者舊的頭結點為空,或者頭結點的?ws?小于0
??//?又或者新的頭結點為空,或者新頭結點的?ws?小于0,則喚醒后繼節(jié)點
??if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
??????(h?=?head)?==?null?||?h.waitStatus?0)?{
??????Node?s?=?node.next;
??????if?(s?==?null?||?s.isShared())
??????????doReleaseShared();
??}
}
private?void?setHead(Node?node)?{
??head?=?node;
??node.thread?=?null;
??node.prev?=?null;
}
private?void?doReleaseShared()?{????
??for?(;;)?{
??????Node?h?=?head;
??????//?保證同步隊列中至少有兩個節(jié)點
??????if?(h?!=?null?&&?h?!=?tail)?{
??????????int?ws?=?h.waitStatus;
??????????//?需要喚醒后繼節(jié)點
??????????if?(ws?==?Node.SIGNAL)?{
??????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
??????????????????continue;????????????//?loop?to?recheck?cases
??????????????unparkSuccessor(h);
??????????}
??????????//?將節(jié)點狀態(tài)更新為PROPAGATE
??????????else?if?(ws?==?0?&&
???????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
??????????????continue;????????????????//?loop?on?failed?CAS
??????}
??????if?(h?==?head)???????????????????//?loop?if?head?changed
??????????break;
??}
}
其實看到這里就有一些邏輯我看不懂了,比如setHeadAndPropagate()方法中的這一段邏輯
if?(propagate?>?0?||?h?==?null?||?h.waitStatus?0?||
??????(h?=?head)?==?null?||?h.waitStatus?0)?{
寫成這樣不好嗎?
if(propagate?>?0??&&?h.waitStatus?0)
為什么要那么復雜呢?而且看上去這樣也可以嘛,我本來想要假裝看懂的,可是我發(fā)現(xiàn)騙自己真的不容易呀。
這里應該是有什么特殊的原因,不然Doug Lea老爺子不會這么寫。。。。。。
PROPAGATE狀態(tài)有什么用?
我就去網上搜索了下,結果在Java的Bug列表中發(fā)現(xiàn)是因為有一個bug才這樣修改的
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020

看到這個bug在2011年就在JDK6中被修復了,我想說那個時候我還不知道java是啥呢。。。。
這個修改可以在Doug Lead老爺子的主頁中找到,通過JSR 166找到可對比的CSV,對比1.73和1.74兩個版本
http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/locks/AbstractQueuedSynchronizer.java?r1=1.73&r2=1.74
我們先看看setHeadAndPropagate中的修改對比

以前的版本中判斷條件是這樣的
if?(propagate?>?0?&&?node.waitStatus?!=?0)
這樣的判斷很符合我的認知嘛。但是會造成怎樣的問題呢?按照bug的描述,我們來紙上談兵下沒有PROPAGATE狀態(tài)的時候會出什么問題。
首先 Semaphore初始化state值為0,然后4個線程分別運行4個任務。線程t1,t2同時獲取鎖,另外兩個線程t3,t3同時釋放鎖
public?class?TestSemaphore?{
??//?這里將信號量設置成了0
??private?static?Semaphore?sem?=?new?Semaphore(0);
??private?static?class?Thread1?extends?Thread?{
????@Override
????public?void?run()?{
??????//?獲取鎖
??????sem.acquireUninterruptibly();
????}
??}
??private?static?class?Thread2?extends?Thread?{
????@Override
????public?void?run()?{
??????//?釋放鎖
??????sem.release();
????}
??}
??public?static?void?main(String[]?args)?throws?InterruptedException?{
????for?(int?i?=?0;?i?10000000;?i++)?{
??????Thread?t1?=?new?Thread1();
??????Thread?t2?=?new?Thread1();
??????Thread?t3?=?new?Thread2();
??????Thread?t4?=?new?Thread2();
??????t1.start();
??????t2.start();
??????t3.start();
??????t4.start();
??????t1.join();
??????t2.join();
??????t3.join();
??????t4.join();
??????System.out.println(i);
????}
??}
}
根據上面的代碼,我們將信號量設置為0,所以t1,t2獲取鎖會失敗。
假設某次循環(huán)中隊列中的情況如下
head?-->?t1?-->?t2(tail)
鎖的釋放由t3先釋放,t4后釋放
時刻1: 線程t3調用releaseShared(),然后喚醒隊列中節(jié)點(線程t1),此時head的狀態(tài)從-1變成0
時刻2: 線程t1由于線程t3釋放了鎖,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0

時刻3: 線程t4調用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),不滿足條件,因此不喚醒后繼節(jié)點

時刻4: ?線程t1獲取鎖成功,調用setHeadAndPropagate(),因為不滿足propagate > 0(時刻2中propagate == 0),從而不會喚醒后繼節(jié)點
如果沒有PROPAGATE狀態(tài),上面的情況就會導致線程t2不會被喚醒。
那在引入了propagate之后這個變量又會是怎樣的情況呢?
時刻1: 線程t3調用doReleaseShared,然后喚醒隊列中結點(線程t1),此時head的狀態(tài)從-1變成0
時刻2: 線程t1由于t3釋放了信號量,被t3喚醒,然后通過nonfairTryAcquireShared()取得propagate值為0
時刻3: 線程t4調用releaseShared(),讀到此時waitStatue為0(和時刻1中的head是同一個head),將節(jié)點狀態(tài)設置為PROPAGATE(值為-3)
?else?if?(ws?==?0?&&
????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
???continue;?????//?loop?on?failed?CAS
時刻4: 線程t1獲取鎖成功,調用setHeadAndPropagate(),雖然不滿足propagate > 0(時刻2中propagate == 0),但是waitStatus<0,所以會去喚醒后繼節(jié)點
至此我們知道了PROPAGATE的作用,就是為了避免線程無法會喚醒的窘境。,因為共享鎖會有很多線程獲取到鎖或者釋放鎖,所以有些方法是并發(fā)執(zhí)行的,就會產生很多中間狀態(tài),而PROPAGATE就是為了讓這些中間狀態(tài)不影響程序的正常運行。
doReleaseShared-小方法大智慧
無論是釋放鎖還是申請到鎖都會調用doReleaseShared()方法,這個方法看似簡單,其實里面的邏輯還是很精妙的。
private?void?doReleaseShared()?{????
??for?(;;)?{
????Node?h?=?head;
????//?保證同步隊列中至少有兩個節(jié)點
????if?(h?!=?null?&&?h?!=?tail)?{
??????int?ws?=?h.waitStatus;
??????//?需要喚醒后繼節(jié)點
??????if?(ws?==?Node.SIGNAL)?{
??????????//?可能有其他線程調用doReleaseShared(),unpark操作只需要其中一個調用就行了
??????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
??????????????continue;?????//?loop?to?recheck?cases
??????????unparkSuccessor(h);
??????}
??????//?將節(jié)點狀態(tài)設置為PROPAGATE(畫重點了)
??????else?if?(ws?==?0?&&
????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
??????????continue;????//?loop?on?failed?CAS
????}
????if?(h?==?head)????????//?loop?if?head?changed
??????break;
??}
}
這其中有一個判斷條件
ws?==?0?&&!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE)
這個if條件成立也巧妙
首先隊列中現(xiàn)在至少有兩個節(jié)點,簡化分析,我們認為它只有兩個節(jié)點,head --> node 執(zhí)行到else if,說明跳過了前面的if條件,說明頭結點是剛成為頭結點的,它的waitStatus為0,尾節(jié)點是在這之后加入的,發(fā)生這種情況是shouldParkAfterFailedAcquire()中還沒來得及將前一個節(jié)點的ws值修改為SIGNAL CAS失敗說明此時頭結點的ws不為0了,也就表明shouldParkAfterFailedAcquire()已經將前驅節(jié)點的waitStatus值修改為了SIGNAL了

而整個循環(huán)的退出條件是在h==head的時候,這個是為什么呢?
由于我們的head節(jié)點是一個虛擬節(jié)點(也可以叫做哨兵節(jié)點),假設我們的同步隊列中節(jié)點順序如下:
head --> A --> B --> C
現(xiàn)在假設A拿到了共享鎖,那么它將成為新的dummy node(虛擬節(jié)點),
head(A) --> B --> C
此時A線程會調用doReleaseShared方法喚醒后繼節(jié)點B,它很快就獲取到了鎖,并成為了新的頭節(jié)點
head(B) --> C
此時B線程也會調用該方法,并喚醒其后繼節(jié)點C,但是在B線程調用的時候,線程A可能還沒有運行結束,也正在執(zhí)行這個方法,
當它執(zhí)行到h==head的時候發(fā)現(xiàn)head改變了,所以for循環(huán)就不會退出,又會繼續(xù)執(zhí)行for循環(huán),喚醒后繼節(jié)點。
至此我們共享鎖分析完畢,其實只要弄明白了AQS的邏輯,依賴于AQS實現(xiàn)的Semaphore就很簡單了。
在看共享鎖源碼過程中尤其需要注意的是方法是會被多個線程并發(fā)執(zhí)行的,所以其中很多判斷是多線程競爭情況下才會出現(xiàn)的。同時需要注意的是共享鎖并不能保證線程安全,仍然需要程序員自己保證對共享資源的操作是安全的。
