面試官:從源碼角度講講ReentrantLock及隊(duì)列同步器(AQS)
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!
編輯:業(yè)余草
blog.csdn.net/fuzhongmin05
推薦:https://www.xttblog.com/?p=5178
JDK 中獨(dú)占鎖(排他鎖)的實(shí)現(xiàn)除了使用關(guān)鍵字 synchronized 外,還可以使用ReentrantLock。雖然在性能上 ReentrantLock 和 synchronized 沒(méi)有什么大區(qū)別,但 ReentrantLock 相比 synchronized 而言功能更加豐富,使用起來(lái)更為靈活,也更適合復(fù)雜的并發(fā)場(chǎng)景。
ReentrantLock 常常對(duì)比著 synchronized 來(lái)分析,我們先對(duì)比著來(lái)看然后再一點(diǎn)一點(diǎn)分析。
synchronized 是獨(dú)占鎖,加鎖和解鎖的過(guò)程自動(dòng)進(jìn)行,易于操作,但不夠靈活。ReentrantLock 也是獨(dú)占鎖,加鎖和解鎖的過(guò)程需要手動(dòng)進(jìn)行,不易操作,但非常靈活。 synchronized 可重入,因?yàn)榧渔i和解鎖自動(dòng)進(jìn)行,不必?fù)?dān)心最后是否釋放鎖;ReentrantLock 也可重入,但加鎖和解鎖需要手動(dòng)進(jìn)行,且次數(shù)需一樣,否則其他線程無(wú)法獲得鎖。 synchronized 不可響應(yīng)中斷,一個(gè)線程獲取不到鎖就一直等著;ReentrantLock 可以相應(yīng)中斷。
ReentrantLock 好像比 synchronized 關(guān)鍵字沒(méi)好太多,我們?cè)偃タ纯?synchronized 所沒(méi)有的。一個(gè)最主要的就是 ReentrantLock 還可以實(shí)現(xiàn)公平鎖機(jī)制。什么叫公平鎖呢?也就是在鎖上等待時(shí)間最長(zhǎng)的線程將獲得鎖的使用權(quán)。通俗的理解就是誰(shuí)排隊(duì)時(shí)間最長(zhǎng)誰(shuí)先執(zhí)行獲取鎖。
在講解 ReentrantLock 之前,必須先要了解一個(gè)叫 AQS(隊(duì)列同步器)的東西,這個(gè)東西也是并發(fā)包的,AQS 就是AbstractQueuedSynchronizer,它是 Java 并發(fā)包中的一個(gè)核心,ReentrantLock 以及ReentrantReadWriteLock都是基于 AQS 實(shí)現(xiàn)的。想要搞明白 ReentrantLock 是如何實(shí)現(xiàn)的,就必須先學(xué)習(xí)AbstractQueuedSynchronizer。本文結(jié)合 JDK1.8 的源碼先剖析下隊(duì)列同步器,然后再詳解 ReentrantLock 中如何應(yīng)用隊(duì)列同步器實(shí)現(xiàn)獨(dú)占鎖的。
隊(duì)列同步器(AQS)
隊(duì)列同步器 AbstractQueuedSynchronizer,是用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用了一個(gè) volatile 修飾的 int 成員變量表示同步狀態(tài),通過(guò)內(nèi)置的 FIFO 隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作。
同步器的主要使用方式是繼承,子類通過(guò)繼承同步器并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài),在抽象方法的實(shí)現(xiàn)過(guò)程中免不了要對(duì)同步狀態(tài)進(jìn)行更改,這時(shí)就需要同步器提供的 3 個(gè)方法getState()、setState(int newState)和compareAndSetState(int expect, int update)來(lái)進(jìn)行操作,因?yàn)樗鼈兡軌虮WC狀態(tài)的改變是安全的。子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器自身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放方法來(lái)供自定義同步組件使用,同步器即可以支持獨(dú)占式獲取同步狀態(tài),也可以支持共享式地獲取同步狀態(tài),這樣方便實(shí)現(xiàn)不同類型的同步組件(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等)。
AQS 是實(shí)現(xiàn)鎖(也可以是任何同步組件)的關(guān)鍵:在鎖中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語(yǔ)義。兩者的關(guān)系:鎖是面向使用者的,他定義了使用者與鎖交互的接口(比如允許兩個(gè)線程并行訪問(wèn)),隱藏了實(shí)現(xiàn)細(xì)節(jié);同步器是面向鎖的實(shí)現(xiàn)者,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式,屏蔽了同步管理狀態(tài)、線程的排隊(duì)、等待與喚醒等底層操作。
隊(duì)列同步器的接口及模板方法
同步器基于模板設(shè)計(jì)模式實(shí)現(xiàn)的,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義的同步組件的實(shí)現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法會(huì)調(diào)用使用者重寫的方法。
重寫同步器指定方法時(shí)需要使用同步器提供的如下三個(gè)方法來(lái)訪問(wèn)或修改同步狀態(tài):
getState():獲取當(dāng)前同步狀態(tài)setState(int new State):設(shè)置當(dāng)前同步狀態(tài)compareAndState(int expect,int update):使用 CAS 設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原子性。
同步器可重寫的方法:

實(shí)現(xiàn)自定義同步組件時(shí),將會(huì)調(diào)用AbstractQueuedSynchronizer自身已經(jīng)寫好的模板方法,這些模板方法與描述:

注:模板方法基本分為三類:獨(dú)占式同步狀態(tài)獲取與釋放、共享式同步狀態(tài)獲取與釋放和查詢同步隊(duì)列中等待線程情況。
AbstractQueuedSynchronizer要求子類必須覆寫的方法如下,之所以要求子類重寫這些方法,是為了讓使用者可以在其中加入自己的判斷邏輯。

AQS 中提供了很多關(guān)于鎖的實(shí)現(xiàn)方法
getState():獲取鎖的標(biāo)志 state 值setState():設(shè)置鎖的標(biāo)志 state 值tryAcquire(int):獨(dú)占方式獲取鎖。嘗試獲取資源,成功則返回 true,失敗則返回 falsetryRelease(int):獨(dú)占方式釋放鎖。嘗試釋放資源,成功則返回 true,失敗則返回 false
隊(duì)列同步器數(shù)據(jù)結(jié)構(gòu)
同步器依賴于內(nèi)部的同步隊(duì)列(一個(gè)FIFO雙向隊(duì)列)來(lái)完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí),同步器會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)成一個(gè)節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列,同時(shí)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會(huì)將首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
在AbstractQueuedSynchronizer類中,Node 類是靜態(tài)內(nèi)部類,其源碼如下:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
//等待狀態(tài)
volatile int waitStatus;
//指向前一個(gè)結(jié)點(diǎn)的指針
volatile Node prev;
//指向后一個(gè)結(jié)點(diǎn)的指針
volatile Node next;
//當(dāng)前結(jié)點(diǎn)代表的線程
volatile Thread thread;
//等待隊(duì)列中的后繼節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)是共享的,那么這個(gè)字段將是SHARED常量,即節(jié)點(diǎn)類型(獨(dú)占和共享)和等待隊(duì)列中的后繼節(jié)點(diǎn)共用同一個(gè)字段
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
同時(shí),在AbstractQueuedSynchronizer類中,又單獨(dú)定義了隊(duì)列頭結(jié)點(diǎn)、尾結(jié)點(diǎn)、同步狀態(tài)變量。
//指向隊(duì)列頭結(jié)點(diǎn)
private transient volatile Node head;
//指向隊(duì)列尾結(jié)點(diǎn)
private transient volatile Node tail;
//同步狀態(tài)變量
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
同步隊(duì)列的結(jié)構(gòu)如圖:

為了接下來(lái)能夠更好的理解加鎖和解鎖過(guò)程的源碼,對(duì)該同步隊(duì)列的特性進(jìn)行簡(jiǎn)單的說(shuō)明:
同步隊(duì)列是個(gè)先進(jìn)先出(FIFO)隊(duì)列,獲取鎖失敗的線程將構(gòu)造結(jié)點(diǎn)并加入隊(duì)列的尾部,加入隊(duì)列的過(guò)程必須保證線程安全,為什么必須保證線程安全?因?yàn)橐鎸?duì)同時(shí)有多條線程沒(méi)有獲取到同步狀態(tài)要加入同步隊(duì)列尾部的情況; 隊(duì)列首結(jié)點(diǎn)是獲取同步狀態(tài)成功的線程節(jié)點(diǎn); 前驅(qū)結(jié)點(diǎn)線程釋放鎖后將嘗試喚醒后繼結(jié)點(diǎn)中處于阻塞狀態(tài)的線程
同步器將節(jié)點(diǎn)加入同步隊(duì)列的過(guò)程:

同步隊(duì)列遵循 FIFO,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為新的首節(jié)點(diǎn)。

注:設(shè)置首節(jié)點(diǎn)是通過(guò)由已經(jīng)獲取到了同步狀態(tài)的線程來(lái)完成的,由于只有一個(gè)線程能夠獲取到同步狀態(tài),因此設(shè)置頭節(jié)點(diǎn)的方法并不需要 CAS 來(lái)保障,它只需要讓head 指針指向原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開(kāi)原首節(jié)點(diǎn)的 next 引用即可。
隊(duì)列同步器提供的獨(dú)占式同步狀態(tài)獲取方法
通過(guò)AbstractQueuedSynchronizer類提供的acquire(int arg)方法可以獲取同步狀態(tài),該方法對(duì)中斷不敏感,也就是說(shuō)由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊(duì)列中,后繼對(duì)線程進(jìn)行中斷操作時(shí),線程不會(huì)從同步隊(duì)列移除。acquire 方法:
public final void acquire(int arg) {
//tryAcquie()方法具體要交給子類去實(shí)現(xiàn),AbstractQueuedSynchronizer類中不實(shí)現(xiàn)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代碼中完成了同步狀態(tài)的獲取、節(jié)點(diǎn)構(gòu)造、加入同步隊(duì)列以及同步隊(duì)列中自旋等待的相關(guān)工作。
首先調(diào)用自定義同步器(AbstractQueuedSynchronizer的子類)實(shí)現(xiàn)的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態(tài),如果同步狀態(tài)獲取失敗,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式Node.EXCLUSIVE,同一時(shí)刻只能有一個(gè)線程成功獲取同步狀態(tài))并通過(guò)addWaiter(Node node)方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部,最后調(diào)用acquireQueued(Node node,int arg)方法,使得該節(jié)點(diǎn)以“死循環(huán)”的方式獲取同步狀態(tài)。如果獲取不到就阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來(lái)實(shí)現(xiàn)。
節(jié)點(diǎn)的構(gòu)造以及加入同步隊(duì)列依靠于 addWaiter 和 enq 方法:
private Node addWaiter(Node mode) {
首先創(chuàng)建一個(gè)新節(jié)點(diǎn),并將當(dāng)前線程實(shí)例封裝在內(nèi)部,mode這里為null
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
///入隊(duì)的邏輯
enq(node);
return node;
}
/**
* 隊(duì)列不空時(shí)向尾部添加結(jié)點(diǎn)的邏輯在enq(node)方法中也有,之所以會(huì)有這部分“重復(fù)代碼”是對(duì)某些特殊情況進(jìn)行提前處理,犧牲一定的代碼可讀性換取性能提升。
*/
private Node enq(final Node node) {
for (;;) {
//t指向當(dāng)前隊(duì)列的最后一個(gè)節(jié)點(diǎn),隊(duì)列為空則為null
Node t = tail;
//隊(duì)列為空
if (t == null) {
//此時(shí)鏈表沒(méi)有節(jié)點(diǎn),需要初始化讓head跟tail都指向一個(gè)哨兵節(jié)點(diǎn)
//構(gòu)造新結(jié)點(diǎn),CAS方式設(shè)置為隊(duì)列首元素,當(dāng)head==null時(shí)更新成功
if (compareAndSetHead(new Node()))
tail = head;//尾指針指向首結(jié)點(diǎn)
} else { //隊(duì)列不為空
node.prev = t;
if (compareAndSetTail(t, node)) { //CAS將尾指針指向當(dāng)前結(jié)點(diǎn),當(dāng)t(原來(lái)的尾指針)==tail(當(dāng)前真實(shí)的尾指針)時(shí)執(zhí)行成功
t.next = node; //原尾結(jié)點(diǎn)的next指針指向當(dāng)前結(jié)點(diǎn)
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循環(huán),正常情況下線程只有獲得鎖才能跳出循環(huán)
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//將當(dāng)前結(jié)點(diǎn)設(shè)置為隊(duì)列頭結(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
//正常情況下死循環(huán)唯一的出口
return interrupted;
}
//判斷是否要阻塞當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看完enq(final Node node)方法后,發(fā)現(xiàn)事實(shí)上 AQS 隊(duì)列的頭節(jié)點(diǎn)其實(shí)是個(gè)哨兵節(jié)點(diǎn)。
在enq(final Node node)中,同步器通過(guò)死循環(huán)的方式來(lái)確保節(jié)點(diǎn)的添加,在死循環(huán)中只有通過(guò) CAS 將當(dāng)前節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后,當(dāng)前線程才能從該方法返回,否則的話當(dāng)前線程不斷地嘗試設(shè)置。enq(final Node node)方法將并發(fā)添加節(jié)點(diǎn)的請(qǐng)求通過(guò) CAS 變得“串行化”了。循環(huán)加 CAS 操作是實(shí)現(xiàn)樂(lè)觀鎖的標(biāo)準(zhǔn)方式,CAS 是為了實(shí)現(xiàn)原子操作而出現(xiàn)的,所謂的原子操作指操作執(zhí)行期間,不會(huì)受其他線程的干擾。Java 實(shí)現(xiàn)的 CAS 是調(diào)用 unsafe 類提供的方法,底層是調(diào)用 C++ 方法,直接操作內(nèi)存,在 CPU 層面加鎖,直接對(duì)內(nèi)存進(jìn)行操作。
來(lái)看shouldParkAfterFailedAcquire(Node pred, Node node),從方法名上我們可以大概猜出這是判斷是否要阻塞當(dāng)前線程的,源碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //狀態(tài)為SIGNAL
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //狀態(tài)為CANCELLED,
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //狀態(tài)為初始化狀態(tài)(ReentrentLock語(yǔ)境下)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
可以看到針對(duì)前驅(qū)結(jié)點(diǎn) pred 的狀態(tài)會(huì)進(jìn)行不同的處理:
pred 狀態(tài)為 SIGNAL,則返回 true,表示要阻塞當(dāng)前線程; pred 狀態(tài)為 CANCELLED,則一直往隊(duì)列頭部回溯直到找到一個(gè)狀態(tài)不為CANCELLED 的結(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn) node 掛在這個(gè)結(jié)點(diǎn)的后面; pred 的狀態(tài)為初始化狀態(tài),此時(shí)通過(guò) CAS 操作將 pred 的狀態(tài)改為 SIGNAL
其實(shí)這個(gè)方法的含義很簡(jiǎn)單,就是確保當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)的狀態(tài)為 SIGNAL,SIGNAL 意味著線程釋放鎖后會(huì)喚醒后面阻塞的線程。畢竟,只有確保能夠被喚醒,當(dāng)前線程才能放心的阻塞。
要注意只有在前驅(qū)結(jié)點(diǎn)已經(jīng)是 SIGNAL 狀態(tài)后才會(huì)執(zhí)行后面的方法立即阻塞,對(duì)應(yīng)上面的第一種情況。其他兩種情況則因?yàn)榉祷?false 而重新執(zhí)行一遍
acquireQueued()方法的源碼表明節(jié)點(diǎn)在進(jìn)入隊(duì)列后,就進(jìn)入了一個(gè)自旋狀態(tài),每個(gè)節(jié)點(diǎn)(或者說(shuō)每個(gè)線程),都在自省觀察,當(dāng)條件滿足,獲取到同步狀態(tài),就可以從這個(gè)自旋過(guò)程中退出,否則依舊留在自旋過(guò)程中,
這個(gè)過(guò)程如下圖所示。

在acquireQueued(final Node node, int arg)方法中,線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)時(shí)才能夠嘗試獲取同步狀態(tài),原因如下:
頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn),而頭節(jié)點(diǎn)線程獲取到同步狀態(tài)后,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程在被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn); 維護(hù)同步隊(duì)列的 FIFO 原則
可以看到節(jié)點(diǎn)與及節(jié)點(diǎn)之間在循環(huán)檢查的過(guò)程中基本上不相互通信,而是簡(jiǎn)單地判斷自己的前驅(qū)是否為頭節(jié)點(diǎn),這樣就使得節(jié)點(diǎn)的釋放符合 FIFO,并且對(duì)于方便對(duì)過(guò)早通知進(jìn)行處理。
獨(dú)占式同步狀態(tài)獲取流程如下圖,也就是acquire(int arg)方法的執(zhí)行流程:

當(dāng)同步狀態(tài)獲取成功,當(dāng)前線程從acquire(int arg)方法返回,這也就代表著當(dāng)前線程獲得了鎖。
隊(duì)列同步器提供的獨(dú)占式同步狀態(tài)釋放方法
釋放同步狀態(tài),通過(guò)調(diào)用同步器的release(int arg)方法可以釋放同步狀態(tài),該方法在釋放了同步狀態(tài)后,會(huì)喚醒其后繼節(jié)點(diǎn)(進(jìn)而使后繼節(jié)點(diǎn)重新嘗試獲取同步狀態(tài))。解鎖的源碼相對(duì)簡(jiǎn)單,代碼如下:
public final boolean release(int arg) {
//tryRelease()方法具體要交給子類去實(shí)現(xiàn),AbstractQueuedSynchronizer類中不實(shí)現(xiàn)
if (tryRelease(arg)) {
Node h = head;
//當(dāng)前隊(duì)列不為空且頭結(jié)點(diǎn)狀態(tài)不為初始化狀態(tài)(0)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //喚醒同步隊(duì)列中被阻塞的線程
return true;
}
return false;
}
若當(dāng)前線程已經(jīng)完全釋放鎖,即鎖可被其他線程使用,則還應(yīng)該喚醒后續(xù)等待線程。不過(guò)在此之前需要進(jìn)行兩個(gè)條件的判斷:
h != null是為了防止隊(duì)列為空,即沒(méi)有任何線程處于等待隊(duì)列中,那么也就不需要進(jìn)行喚醒的操作;h.waitStatus != 0是為了防止隊(duì)列中雖有線程,但該線程還未阻塞,由前面的分析知,線程在阻塞自己前必須設(shè)置其前驅(qū)結(jié)點(diǎn)的狀態(tài)為 SIGNAL,否則它不會(huì)阻塞自己;
該方法執(zhí)行時(shí),會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccerssor(Node node)方法使用 LcokSupport 來(lái)喚醒處于等待(阻塞)狀態(tài)的線程。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//從尾部向頭部遍歷
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
一般情況下只要喚醒后繼結(jié)點(diǎn)的線程就行了,但是后繼結(jié)點(diǎn)可能已經(jīng)取消等待,所以從隊(duì)列尾部往前回溯,找到離頭結(jié)點(diǎn)最近的正常結(jié)點(diǎn),并喚醒其線程。
獨(dú)占式同步狀態(tài)釋放流程如下圖,也就是release(int arg)方法的執(zhí)行流程:

獨(dú)占式同步狀態(tài)獲取和釋放:
在獲取同步狀態(tài)時(shí),同步器會(huì)維持一個(gè)同步隊(duì)列,獲取失敗的線程都會(huì)被加入到同步隊(duì)列中,并在同步隊(duì)列中自旋(判斷自己前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn))。 移出隊(duì)列(停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器調(diào)用 tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
共享式同步狀態(tài)獲取與釋放
共享式獲取與獨(dú)占式獲取的區(qū)別:同一時(shí)刻能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。
以文件的讀寫為例:
如果有一個(gè)程序在讀文件,那么這一時(shí)刻的寫操作均被阻塞,而讀操作能夠同時(shí)進(jìn)行。 如果有一個(gè)程序在寫文件,那么這一時(shí)刻不管是其他的寫操作還是讀操作,均被阻塞。 寫操作要求對(duì)資源的獨(dú)占式訪問(wèn),而讀操作可以是共享式訪問(wèn)。
調(diào)用同步器的acquireShared()模板方法,可以實(shí)現(xiàn)共享式獲取同步狀態(tài)。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//當(dāng)前節(jié)點(diǎn)加入同步隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),就繼續(xù)嘗試獲取同步狀態(tài)
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//用LockSupport的park方法把當(dāng)前線程阻塞住
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
當(dāng)前線程首先調(diào)用 tryAcquireShared()這個(gè)被子類重寫的方法,共享式的獲取同步狀態(tài)。如果返回值大于等于 0,表示獲取成功并返回。如果返回值小于0表示獲取失敗,調(diào)用 doAcquireShared() 方法,讓線程進(jìn)入自旋狀態(tài)。 自旋過(guò)程中,如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),且調(diào)用 tryAcquireShared()方法返回值大于等于 0,則退出自旋。否則,繼續(xù)進(jìn)行自旋。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先去嘗試釋放資源tryReleaseShared(arg),如果釋放成功了,就代表有資源空閑出來(lái),那么就用 doReleaseShared() 去喚醒后續(xù)結(jié)點(diǎn)。
ReentrantLock 對(duì)隊(duì)列同步器的應(yīng)用
syncronized 關(guān)鍵字隱式的支持重進(jìn)入,比如 syncronized 修飾一個(gè)遞歸方法,在方法執(zhí)行時(shí),執(zhí)行線程在獲取了鎖之后仍能連續(xù)多次地獲取該鎖。
ReentrantLock 雖然沒(méi)能像 synchronized 關(guān)鍵字一樣支持隱式的重進(jìn)入,但是在調(diào)用 lock() 方法時(shí),已經(jīng)獲取了鎖的線程,能夠再次調(diào)用 lock() 方法而不被阻塞。要實(shí)現(xiàn)可重入的特性,就要解決以下兩個(gè)問(wèn)題:
線程再次獲取鎖,鎖需要去識(shí)別獲取鎖的線程是否為當(dāng)前占據(jù)鎖的線程,如果是,則再次成功獲取 鎖的最終釋放,線程重復(fù) n 次獲取了鎖,隨后在第 n 次釋放該鎖后,其他線程能夠獲取到鎖。鎖的最終釋放要求鎖對(duì)于獲取進(jìn)行自增,對(duì)于釋放進(jìn)行自減,當(dāng)計(jì)數(shù)等于0時(shí)表示鎖已經(jīng)成功釋放。
ReentrantLock 通過(guò)組合自定義隊(duì)列同步器來(lái)實(shí)現(xiàn)鎖的可重入式獲取與釋放。
ReentrantLock 的類圖如下,可以看出 ReentrantLock 實(shí)現(xiàn) Lock 接口,Sync 與 ReentrantLock 是組合關(guān)系,且 FairSync(公平鎖)、NonfairySync(非公平鎖)是 Sync 的子類。

公平鎖是 FairSync,非公平鎖是 NonfairSync。而不管是 FairSync 還是 NonfariSync,都間接繼承自 AbstractQueuedSynchronizer 這個(gè)抽象類,如下圖所示

ReentrantLock 非公平模式獲取及釋放鎖
ReentrantLock 非公平模式下的獲取鎖的代碼如下:
//實(shí)現(xiàn)Lock接口的lock方法,調(diào)用本方法當(dāng)前線程獲取鎖,拿鎖成功后就返回
public void lock() {
//非公平模式下,sync指向的對(duì)象類型是NonfairSync
sync.lock();
}
//實(shí)現(xiàn)Lock接口的tryLock方法,嘗試非阻塞的獲取鎖,調(diào)用本方法后立刻返回,如果能獲取到鎖則返回true,否則返回false
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
靜態(tài)內(nèi)部類 NonfairSync 的源碼如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//首先CAS嘗試下獲取鎖,先假設(shè)每次lock都是非重入
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//這里調(diào)用的是父類AbstractQueuedSynchronizer的acquire()方法,
//而acquire()方法中又要調(diào)用交由子類去實(shí)現(xiàn)的tryAcquiretryAcquire()方法
//所以會(huì)調(diào)到下面NonfairSync類的tryAcquire()方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
nonfairTryAcquire()方法的源碼如下:
//本方法寫在Sync類中,而不是FairSync類中
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果當(dāng)前鎖閑置,就CAS嘗試下獲取鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire()方法增加了再次獲取同步狀態(tài)的處理邏輯:通過(guò)判斷當(dāng)前線程是否為已經(jīng)獲取了鎖的線程來(lái)決定獲取操作是否成功,如果是則將同步狀態(tài)值增加并返回 true,表示獲取同步狀態(tài)成功。
ReentrantLock 類的 unlock 方法:
public void unlock() {
//這里調(diào)用的是父類AbstractQueuedSynchronizer的release()方法,
//而release()方法中又要調(diào)用交由子類去實(shí)現(xiàn)的tryRelease()方法
//所以會(huì)調(diào)到Sync類的tryRelease()方法
sync.release(1);
}
由于公平鎖與非公平鎖的差異主要體現(xiàn)在獲取鎖上,因此 tryAcquire() 方法由NonfairSync 類與 FairSync 類分別去實(shí)現(xiàn),而無(wú)論是公平鎖還是非公平鎖,鎖的釋放過(guò)程都是一樣的,因此 tryRelease() 方法由 Sync 類來(lái)實(shí)現(xiàn)。源碼如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果該鎖被獲取了 N 次,那么前 (N-1) 次tryRelease(int releases)方法必須返回 false,而只有同步狀態(tài)完全釋放了,才能返回 true。
ReentrantLock 公平模式獲取及釋放鎖
公平鎖模式下,對(duì)鎖的獲取有嚴(yán)格的條件限制。在同步隊(duì)列有線程等待的情況下,所有線程在獲取鎖前必須先加入同步隊(duì)列,隊(duì)列中的線程按加入隊(duì)列的先后次序獲得鎖。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//這里調(diào)用的是父類AbstractQueuedSynchronizer的acquire()方法
//父類的acquire()方法中會(huì)調(diào)用tryAcquire()方法,該方法由子類去實(shí)現(xiàn)
//即最終會(huì)調(diào)用到FairSync實(shí)現(xiàn)的tryAcquire()方法
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//在真正CAS獲取鎖之前加了hasQueuedPredecessors()方法
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
//溢出了
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
hasQueuedPredecessors()方法在AbstractQueuedSynchronizer類中(模板方法模式的典型應(yīng)用,所有公共的方法全部都由AbstractQueuedSynchronizer寫好,只有個(gè)性化邏輯才下沉給子類去實(shí)現(xiàn)),源碼如下:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
從方法名我們就可知道這是判斷隊(duì)列中是否有優(yōu)先級(jí)更高的等待線程,隊(duì)列中哪個(gè)線程優(yōu)先級(jí)最高?由于頭結(jié)點(diǎn)是當(dāng)前獲取鎖的線程,隊(duì)列中的第二個(gè)結(jié)點(diǎn)代表的線程優(yōu)先級(jí)最高。那么只要判斷隊(duì)列中第二個(gè)結(jié)點(diǎn)是否存在以及這個(gè)結(jié)點(diǎn)是否代表當(dāng)前線程就行了。這里分了兩種情況進(jìn)行探討:
第二個(gè)結(jié)點(diǎn)已經(jīng)完全插入,但是這個(gè)結(jié)點(diǎn)是否就是當(dāng)前線程所在結(jié)點(diǎn)還未知,所以通過(guò)
s.thread != Thread.currentThread()進(jìn)行判斷,如果為 true,說(shuō)明第二個(gè)結(jié)點(diǎn)代表其他線程。第二個(gè)結(jié)點(diǎn)并未完全插入,我們知道結(jié)點(diǎn)入隊(duì)一共分三步:
待插入結(jié)點(diǎn)的 pre 指針指向原尾結(jié)點(diǎn); CAS 更新尾指針; 原尾結(jié)點(diǎn)的 next 指針指向新插入結(jié)點(diǎn)。所以 (s = h.next) == null就是用來(lái)判斷 2 剛執(zhí)行成功但還未執(zhí)行 3 這種情況的。這種情況第二個(gè)結(jié)點(diǎn)必然屬于其他線程。
當(dāng)前有優(yōu)先級(jí)更高的線程在隊(duì)列中等待時(shí),那么當(dāng)前線程將不會(huì)執(zhí)行 CAS 操作去獲取鎖,保證了線程獲取鎖的順序與加入同步隊(duì)列的順序一致,很好的保證了公平性,但也增加了獲取鎖的成本。
基于 FIFO 的同步隊(duì)列是怎樣實(shí)現(xiàn)非公平搶占鎖的
由 FIFO 隊(duì)列的特性知,先加入同步隊(duì)列等待的線程會(huì)比后加入的線程更靠近隊(duì)列的頭部,那么它將比后者更早的被喚醒,它也就能更早的得到鎖。從這個(gè)意義上,對(duì)于在同步隊(duì)列中等待的線程而言,它們獲得鎖的順序和加入同步隊(duì)列的順序一致,這顯然是一種公平模式。然而,線程并非只有在加入隊(duì)列后才有機(jī)會(huì)獲得鎖,哪怕同步隊(duì)列中已有線程在等待,非公平鎖的不公平之處就在于此?;乜聪路枪芥i的加鎖流程,線程在進(jìn)入同步隊(duì)列等待之前有兩次搶占鎖的機(jī)會(huì):
第一次是非重入式的獲取鎖,只有在當(dāng)前鎖未被任何線程占有(包括自身)時(shí)才能成功; 第二次是在進(jìn)入同步隊(duì)列前,包含所有情況的獲取鎖的方式
只有這兩次獲取鎖都失敗后,線程才會(huì)構(gòu)造結(jié)點(diǎn)并加入同步隊(duì)列等待。而線程釋放鎖時(shí)是先釋放鎖(修改 state 值),然后才喚醒后繼結(jié)點(diǎn)的線程的。試想下這種情況,線程 A 已經(jīng)釋放鎖,但還沒(méi)來(lái)得及喚醒后繼線程 C,而這時(shí)另一個(gè)線程 B 剛好嘗試獲取鎖,此時(shí)鎖恰好不被任何線程持有,它將成功獲取鎖而不用加入隊(duì)列等待。線程 C 此后才被喚醒嘗試獲取鎖,而此時(shí)鎖已經(jīng)被線程B搶占,故而其獲取失敗并繼續(xù)在隊(duì)列中等待。
如果以線程第一次嘗試獲取鎖到最后成功獲取鎖的次序來(lái)看,非公平鎖確實(shí)很不公平。因?yàn)樵陉?duì)列中等待很久的線程相比還未進(jìn)入隊(duì)列等待的線程并沒(méi)有優(yōu)先權(quán),甚至競(jìng)爭(zhēng)也處于劣勢(shì):在隊(duì)列中的等待的線程要等待前驅(qū)結(jié)點(diǎn)線程的喚醒,在獲取鎖之前還要檢查自己的前驅(qū)結(jié)點(diǎn)是否為頭結(jié)點(diǎn)。在鎖競(jìng)爭(zhēng)激烈的情況下,在隊(duì)列中等待的線程可能遲遲競(jìng)爭(zhēng)不到鎖。這也就非公平在高并發(fā)情況下會(huì)出現(xiàn)的饑餓問(wèn)題。
為什么非公平鎖性能好
非公平鎖對(duì)鎖的競(jìng)爭(zhēng)是搶占式的(對(duì)于已經(jīng)處于等待隊(duì)列中線程除外),線程在進(jìn)入等待隊(duì)列之前可以進(jìn)行兩次嘗試,這大大增加了獲取鎖的機(jī)會(huì)。這種好處體現(xiàn)在兩個(gè)方面:
線程不必加入等待隊(duì)列就可以獲得鎖,不僅免去了構(gòu)造結(jié)點(diǎn)并加入隊(duì)列的繁瑣操作,同時(shí)也節(jié)省了線程阻塞喚醒的開(kāi)銷,線程阻塞和喚醒涉及到線程上下文的切換和操作系統(tǒng)的系統(tǒng)調(diào)用,是非常耗時(shí)的。在高并發(fā)情況下,如果線程持有鎖的時(shí)間非常短,短到線程入隊(duì)阻塞的過(guò)程超過(guò)線程持有并釋放鎖的時(shí)間開(kāi)銷,那么這種搶占式特性對(duì)并發(fā)性能的提升會(huì)更加明顯; 減少 CAS 競(jìng)爭(zhēng)。如果線程必須要加入阻塞隊(duì)列才能獲取鎖,那入隊(duì)時(shí) CAS 競(jìng)爭(zhēng)將變得異常激烈,CAS 操作雖然不會(huì)導(dǎo)致失敗線程掛起,但不斷失敗自旋導(dǎo)致的對(duì) CPU 的浪費(fèi)也不能忽視。
AQS 在其他同步工具上的應(yīng)用
在 ReentrantLock 的自定義同步器實(shí)現(xiàn)中,同步狀態(tài)表示鎖被一個(gè)線程重復(fù)獲取的次數(shù)。除了 ReentrantLock,AQS 也被大量應(yīng)用在其他同步工具上。
ReentrantReadWriteLock:ReentrantReadWriteLock類使用 AQS 同步狀態(tài)中的低 16 位來(lái)保存寫鎖持有的次數(shù),高16位用來(lái)保存讀鎖的持有次數(shù)。由于 WriteLock 也是一種獨(dú)占鎖,因此其構(gòu)建方式同 ReentrantLock。ReadLock 則通過(guò)使用 acquireShared 方法來(lái)支持同時(shí)允許多個(gè)讀線程。

Semaphore:Semaphore類(信號(hào)量)使用 AQS 同步狀態(tài)來(lái)保存信號(hào)量的當(dāng)前計(jì)數(shù)。它里面定義的 acquireShared 方法會(huì)減少計(jì)數(shù),或當(dāng)計(jì)數(shù)為非正值時(shí)阻塞線程;tryRelease 方法會(huì)增加計(jì)數(shù),在計(jì)數(shù)為正值時(shí)還要解除線程的阻塞。
CountDownLatch:CountDownLatch 類使用 AQS 同步狀態(tài)來(lái)表示計(jì)數(shù)。當(dāng)該計(jì)數(shù)為 0 時(shí),所有的 acquire 操作(對(duì)應(yīng)到CountDownLatch中就是 await 方法)才能通過(guò)。
FutureTask:FutureTask 類使用 AQS 同步狀態(tài)來(lái)表示某個(gè)異步計(jì)算任務(wù)的運(yùn)行狀態(tài)(初始化、運(yùn)行中、被取消和完成)。設(shè)置(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一個(gè) FutureTask 時(shí)會(huì)調(diào)用 AQS 的 release 操作,等待計(jì)算結(jié)果的線程的阻塞解除是通過(guò) AQS 的 acquire 操作實(shí)現(xiàn)的。
SynchronousQueues:SynchronousQueues 類使用了內(nèi)部的等待節(jié)點(diǎn),這些節(jié)點(diǎn)可以用于協(xié)調(diào)生產(chǎn)者和消費(fèi)者。同時(shí),它使用 AQS 同步狀態(tài)來(lái)控制當(dāng)某個(gè)消費(fèi)者消費(fèi)當(dāng)前一項(xiàng)時(shí),允許一個(gè)生產(chǎn)者繼續(xù)生產(chǎn),反之亦然。
