這波對AbstractQueuedSynchronizer的解析,我給自己打99分!
“?寫給自己看,說給別人聽。你好,這是think123的第789篇原創(chuàng)文章”
如何看待AQS?
看了Synchronized的實現(xiàn)方式之后,再來看JDK的AQS,感覺就比較簡單了,它的行為有點像銀行排隊,銀行有很多窗口處理很多業(yè)務(wù),不同的窗口處理不同的業(yè)務(wù)。
比如有個人業(yè)務(wù),也有金融業(yè)務(wù),也有公司業(yè)務(wù)等,每個窗口都有很多人排隊。一般來講當(dāng)你在窗口處理業(yè)務(wù)的時候是不會有人來打擾你的,不一般的時候是什么時候呢?那就是資料分發(fā)窗口,誰來了都給你一張資料,一邊看去吧。
而且當(dāng)你去處理業(yè)務(wù)的時候,你可能有些資料忘記帶了,然后你又要重新去取資料,取完之后回來繼續(xù)排隊。
所以說,代碼源于生活,古人誠不欺我。
帶著問題看源碼
AQS中是如何實現(xiàn)線程阻塞的? AQS為什么支持多個Condition? 線程喚醒的時候順序是怎樣的?
帶著問題去看源碼比一猛子扎進源碼的海洋好,所以我建議你帶著這幾個問題去看AQS源碼。
CLH隊列
AQS是一個抽象類,很多類都是基于它來實現(xiàn)自己的特有的功能的,比如Lock,Semaphore,CountDownLatch等。
我們都知道對于一個共享變量,為了線程安全,同一時刻肯定只能有一個線程線程可以寫這個變量,也就是只有一個線程能拿到鎖。
那么那些沒有拿到鎖的線程是怎么被阻塞的呢?
AQS中維護了一個基于鏈表實現(xiàn)的FIFO隊列(官方叫它CLH鎖),那些沒有獲取到鎖的線程會被封裝后放入都在這個隊列里面,我們來看下這個隊列的定義
static?final?class?Node?{
??//?共享模式
??static?final?Node?SHARED?=?new?Node();
??
??//?排它模式,默認
??static?final?Node?EXCLUSIVE?=?null;
??//?當(dāng)前線程的一個等待狀態(tài),默認值為0
???volatile?int?waitStatus;
??//?前一個節(jié)點
??volatile?Node?prev;
??//?后一個節(jié)點
??volatile?Node?next;
??//?入隊到這個節(jié)點的線程
??volatile?Thread?thread;
??//?下一個等待condition的節(jié)點或者是特殊的節(jié)點值:?SHARED
??Node?nextWaiter;
}
這里共享模式和排它模式是什么意思呢?排它模式是表示同時只能有一個線程獲取到鎖執(zhí)行,而共享模式表示可以同時有多個線程執(zhí)行,比如Semaphore/CountDownLatch。
waitStatus除了默認值外還有三個值,分別代表不同的含義
CANCELLED = 1 : 表示當(dāng)前節(jié)點已經(jīng)不能在獲取鎖了,當(dāng)前節(jié)點由于超時或者中斷而進入該狀態(tài),進入該狀態(tài)的節(jié)點狀態(tài)不會再發(fā)生變化,同時后續(xù)還會從隊列中移除。 SIGNAL = -1 : 表示當(dāng)前節(jié)點需要去喚醒后繼節(jié)點。后繼節(jié)點入隊時,會將當(dāng)前節(jié)點的狀態(tài)更新為SIGNAL CONDITION = -2 : 當(dāng)前節(jié)點處于條件隊列中,當(dāng)其他線程調(diào)用了condition.signal()后,節(jié)點會從條件隊列轉(zhuǎn)義到同步隊列中 PROPAGATE = -3 :當(dāng)共享鎖釋放的時候,這個狀態(tài)會被傳遞到其他后繼節(jié)點
其實原本的CLH隊列是沒有prev這個指針的,它存在的意義是那些取消鎖獲取的線程需要被移除掉。
Synchronized的wait只能有一個條件隊列(WaitSet),而AQS則是支持多個條件隊列,其中條件隊列的關(guān)鍵數(shù)據(jù)結(jié)構(gòu)如下:
?public?class?ConditionObject?implements?Condition,?java.io.Serializable?{
??
??//?condition?queue中的頭節(jié)點
??private?transient?Node?firstWaiter;
??
??//?condition?queue中的尾節(jié)點
??private?transient?Node?lastWaiter;
??/**
???*?創(chuàng)建一個新的condition
???*/
??public?ConditionObject()?{?}
??//?類似于Object的notify
??public?final?void?signal()?{
????...
??}
??
??//?類似于Object的wait
??public?final?void?await()?throws?InterruptedException?{
????...
??}
?}
AQS支持多個Condition的原因就是每個ConditionObject都對應(yīng)一個隊列,所以它可以支持多個Condition。
如果多個線程請求同一把鎖,有的線程阻塞在請求鎖上面,有的鎖阻塞到等待某個條件成立。那么當(dāng)持有鎖的線程讓出鎖的時候,哪個線程應(yīng)該獲取到鎖呢?
當(dāng)獲取了鎖的線程調(diào)用了signal()時,它又會不會從條件隊列(condition queue)中出隊一個node,加入到同步隊列中呢?
答案是會的。
如果我說AQS分析完畢,你們會不會打我?
哈哈哈,各位放下鍵盤,扶我起來,我還能在水一會兒。
AQS中除了這兩個類之外,還有AQS本身還有幾個重要的屬性,它們共同構(gòu)成了AQS這個框架類的基礎(chǔ)結(jié)構(gòu)
??//?wait?queue(或者叫做同步隊列)的頭節(jié)點
??private?transient?volatile?Node?head;
??//?wait?queue(或者叫同步隊列)的尾節(jié)點
??private?transient?volatile?Node?tail;
??//?資源
??private?volatile?int?state;
看到了head和tail,是不是一個雙向鏈表的含義躍然紙上了。
AQS 中的state變量,對不同的子類有不同的含義,該變量對 ReentrantLock 來說表示加鎖狀態(tài),對Semaphore來說則是信號量的數(shù)量??傊鼈兌际腔贏QS中的state來實現(xiàn)各自獨有功能的。
這里的state我覺得更應(yīng)該叫做資源,就類似于去銀行排隊時取的票。對于ReentrantLock來說,這個資源只有1個,對于Semaphore或者CountDownLatch來說資源可以有很多個。
Unsafe講解
unsafe是一個非常強大的類,通過它提供的API,我們可以操作內(nèi)存,修改對象屬性值,甚至內(nèi)存屏障,加鎖解鎖都能通過它完成

圖片來自于: https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html
AQS中就使用了unsafe來實現(xiàn)線程的阻塞以及修改屬性的值,這里把AQS中涉及到unsafe的方法列出來
//?阻塞當(dāng)前線程
void?park(boolean?isAbsolute,?long?time);
//?解鎖指定的線程
void?unpark(Object?thread);
//?獲取字段的偏移地址
long?objectFieldOffset(Field?f);
//?CAS更新對象的值為x,當(dāng)現(xiàn)在的值是expected
boolean?compareAndSwapObject(Object?o,?long?offset,
???????????????????????????????????????Object?expected,
???????????????????????????????????????Object?x);
//?CAS更新
boolean?compareAndSwapInt(Object?o,?long?offset,
????????????????????????????????????int?expected,
????????????????????????????????????int?x);
park和unpark的底層實現(xiàn)其實還是之前我們講解過的pthread,這里不再講解,感興趣的可以去看看之前關(guān)于synchronized的文章。
UNSAFE_ENTRY(void,?Unsafe_Park(JNIEnv?*env,?jobject?unsafe,?
????jboolean?isAbsolute,?jlong?time))
...
?thread->parker()->park(isAbsolute?!=?0,?time);
...
UNSAFE_END
修改對象的屬性值,我們用一段代碼來演示
public?class?UnSafeDemo?{
??public?static?void?main(String[]?args)?throws?Exception?{
????Field?theUnsafe?=?Unsafe.class.getDeclaredField("theUnsafe");
????theUnsafe.setAccessible(true);
????Unsafe?unsafe?=?(Unsafe)?theUnsafe.get(null);
????Student?student?=?new?Student();
????student.setAge(18);
????long?ageOffset?=?unsafe.objectFieldOffset(
????????????????????Student.class.getDeclaredField("age"));
????//?cas修改student的age為20
????unsafe.compareAndSwapInt(student,?ageOffset,?18,?20);
????System.out.println(student.getAge());
??}
}
class?Student?{
??int?age;
??public?int?getAge()?{
??????return?age;
??}
??public?void?setAge(int?age)?{
??????this.age?=?age;
??}
}
上面代碼的輸出結(jié)果為 20,我們通過首先通過objectFieldOffset獲取到age屬性的內(nèi)存偏移地址,然后通過compareAndSwapInt將age屬性的值修改為20。
比如對于一維數(shù)組a[10],基地址是0x000000,那么a[1]的內(nèi)存地址就是0x000001 = 0x000000(基地址) + 1(偏移量)
源碼分析
其他類如果要想將AQS作為基類以實現(xiàn)自己的功能,只需要根據(jù)需求實現(xiàn)以下方法就行了
??//?嘗試獲取鎖
??protected?boolean?tryAcquire(int?arg)?{
????throw?new?UnsupportedOperationException();
??}
??//?嘗試釋放鎖
??protected?boolean?tryRelease(int?arg)?{
????throw?new?UnsupportedOperationException();
??}
??//?嘗試獲取共享鎖
??protected?int?tryAcquireShared(int?arg)?{
????throw?new?UnsupportedOperationException();
??}
??//?嘗試釋放共享鎖
??protected?boolean?tryReleaseShared(int?arg)?{
????throw?new?UnsupportedOperationException();
??}
??//?鎖是否被獨占
??protected?boolean?isHeldExclusively()?{
????throw?new?UnsupportedOperationException();
??}
AQS中獲取鎖的方法是在acquire()中,釋放鎖的方法是release(),如果我們想要實現(xiàn)自定義的鎖的時候,只需要根據(jù)自己的需求實現(xiàn)對應(yīng)的tryXXX方法就行了,為了簡化分析,我們只分析獨占鎖的源碼。
獲取鎖
public?final?void?acquire(int?arg)?{
??if?(!tryAcquire(arg)?&&
????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????selfInterrupt();
}
我們假設(shè)tryAcquire返回false(不關(guān)心子類實現(xiàn)),也就是現(xiàn)在鎖已經(jīng)被其他線程占有了?,F(xiàn)在又有線程來獲取鎖肯定是會獲取失敗的,所以失敗的線程會被封裝成Node插入到隊列中
private?Node?addWaiter(Node?mode)?{
??
??//?將nextWaiter設(shè)置為EXCLUSIVE,表示節(jié)點正以獨占模式等待
??Node?node?=?new?Node(Thread.currentThread(),?mode);
??//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failure
??Node?pred?=?tail;
??if?(pred?!=?null)?{
??????node.prev?=?pred;
??????//?CAS將tail節(jié)點更新為node
??????if?(compareAndSetTail(pred,?node))?{
??????????pred.next?=?node;
??????????return?node;
??????}
??}
??//?如果執(zhí)行到這里就存在兩種情況
??//?1.?pred?==?null,表示是第一次插入,當(dāng)前同步隊列中沒有其他線程
??//?2.?表明有其他線程修改了tail的值,導(dǎo)致CAS修改tail失敗
??enq(node);
??return?node;
}
//?CAS更新tail節(jié)點
private?Node?enq(final?Node?node)?{
??for?(;;)?{
??????Node?t?=?tail;
??????//?處理第一次插入的情況,這里head?=?new?Node()
??????if?(t?==?null)?{
??????????if?(compareAndSetHead(new?Node()))
??????????????tail?=?head;
??????}?else?{
??????????//?處理競爭激烈的情況或者第一次插入后未建立鏈路關(guān)系的情況
??????????node.prev?=?t;
??????????if?(compareAndSetTail(t,?node))?{
??????????????t.next?=?node;
??????????????return?t;
??????????}
??????}
??}
}
上面邏輯很簡單,就是構(gòu)造一個排它鎖模式的node,插入隊列中(尾插入)。
但是你尤其需要注意到enq中,當(dāng)節(jié)點第一次插入的時候,head的值是new Node(),它是一個虛擬節(jié)點,這個節(jié)點本身沒有可運行的線程。
在鏈表中,使用這種虛擬節(jié)點很正常,有時候更加有利于操作。
enq執(zhí)行完成后就形成了這樣的鏈路關(guān)系:

acquireQueued的源碼如下
final?boolean?acquireQueued(final?Node?node,?int?arg)?{
??boolean?failed?=?true;
??try?{
????boolean?interrupted?=?false;
????for?(;;)?{
??????final?Node?p?=?node.predecessor();
??????//?1.?如果前一個節(jié)點是頭節(jié)點,則嘗試再次獲取鎖(機會更大)
??????if?(p?==?head?&&?tryAcquire(arg))?{
??????????setHead(node);
??????????p.next?=?null;?//?help?GC
??????????failed?=?false;
??????????return?interrupted;
??????}
??????//?2.?獲取鎖失敗,需要檢查并更新節(jié)點狀態(tài)
??????if?(shouldParkAfterFailedAcquire(p,?node)?&&
??????????parkAndCheckInterrupt())
??????????interrupted?=?true;
????}
??}?finally?{
??????//?3.?如果線程被中斷,需要將節(jié)點從隊列中移除
??????if?(failed)
??????????cancelAcquire(node);
??}
}
邏輯也比較簡單明了,在線程被阻塞之前,如果前一個節(jié)點是頭節(jié)點(head),那么在嘗試去獲取一次鎖,如果成功了就直接返回。
如果還是失敗的話,就更新下節(jié)點狀態(tài),然后將線程阻塞
private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
??int?ws?=?pred.waitStatus;
??//?前置節(jié)點已經(jīng)是SIGNAL了,可以放心的park了
??if?(ws?==?Node.SIGNAL)
??????return?true;
??if?(ws?>?0)?{
??????//?如果前置節(jié)點狀態(tài)是CANCELLED,需要將節(jié)點移除
??????do?{
??????????node.prev?=?pred?=?pred.prev;
??????}?while?(pred.waitStatus?>?0);
??????pred.next?=?node;
??}?else?{
??????//?這就是我們之前說的在入隊的時候,會通過CAS將前置節(jié)點的狀態(tài)設(shè)置為SIGNAL
??????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
??}
??return?false;
}
線程阻塞的代碼更簡單
private?final?boolean?parkAndCheckInterrupt()?{
??//?調(diào)用了unsafe的park阻塞當(dāng)前線程
??LockSupport.park(this);
??return?Thread.interrupted();
}
至此,獲取鎖的源碼就解析完成了。
從上面的代碼,我們可以知道 head --> next --> next --> tail 這樣的數(shù)據(jù)結(jié)構(gòu)組成了同步隊列,等待獲取鎖的線程會被封裝成node插入到隊列中去。
釋放鎖
release()中主要調(diào)用了unparkSuccessor()方法來喚醒后繼節(jié)點,源碼如下
//?傳入的node是head節(jié)點
private?void?unparkSuccessor(Node?node)?{
??
??//?1.?獲取結(jié)點狀態(tài)
??int?ws?=?node.waitStatus;
??//?2.?使用CAS更新waitStatus為默認值0
??if?(ws?0)
??????compareAndSetWaitStatus(node,?ws,?0);
??//?3.?通過循環(huán)找到后繼節(jié)點中狀態(tài)不為CANCELLED的節(jié)點
??Node?s?=?node.next;
??if?(s?==?null?||?s.waitStatus?>?0)?{
??????s?=?null;
??????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
??????????if?(t.waitStatus?<=?0)
??????????????s?=?t;
??}
??//?4.?調(diào)用unsafe解鎖線程
??if?(s?!=?null)
??????LockSupport.unpark(s.thread);
}
之前提到過head節(jié)點是虛擬節(jié)點,所以會調(diào)用unsafe.park去解鎖head.next節(jié)點
條件變量(Condition)
Java 語言內(nèi)置的管程(synchronized)里只有一個條件變量,而我們的AQS是支持多個條件變量的,Java中定義了一個接口,叫做Condition,AQS的內(nèi)部類ConditionObject實現(xiàn)了這個接口。
public?interface?Condition?{
?//?使得當(dāng)前線程被阻塞,直到收到信號(signal)或者線程被中斷
?void?await()?throws?InterruptedException;
?//?喚醒一個等待線程
?void?signal();
?
?//?喚醒所有的等待線程
?void?signalAll();
?//?省略其他方法
}
Condition中線程等待和通知需要調(diào)用await()、signal()、signalAll(),它們的語義和 wait()、notify()、notifyAll() 是相同的。但是不一樣的是,Condition里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 實現(xiàn)的管程里才能使用。
await
public?final?void?await()?throws?InterruptedException?{
????if?(Thread.interrupted())
????????throw?new?InterruptedException();
????//?將當(dāng)前線程構(gòu)造成一個waiter節(jié)點,waitStatue的值為-2(CONDITION),將它插入到條件隊列中
????Node?node?=?addConditionWaiter();
????//?釋放持有的鎖
????int?savedState?=?fullyRelease(node);
????int?interruptMode?=?0;
????//?如果不在同步隊列中,則阻塞當(dāng)前線程
????while?(!isOnSyncQueue(node))?{
????????LockSupport.park(this);
????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)
????????????break;
????}
????//?在同步隊列中,有可能被喚醒了,需要去重新獲取鎖。并處理中斷邏輯
????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)
????????interruptMode?=?REINTERRUPT;
????if?(node.nextWaiter?!=?null)?//?clean?up?if?cancelled
????????unlinkCancelledWaiters();
????if?(interruptMode?!=?0)
????????reportInterruptAfterWait(interruptMode);
}
?private?Node?addConditionWaiter()?{
??Node?t?=?lastWaiter;
??
??//?如果lastWaiter不是出于condition狀態(tài),則需要移除掉
??if?(t?!=?null?&&?t.waitStatus?!=?Node.CONDITION)?{
??????unlinkCancelledWaiters();
??????t?=?lastWaiter;
??}
??//?將當(dāng)前線程構(gòu)建為一個condition?node
??Node?node?=?new?Node(Thread.currentThread(),?Node.CONDITION);
??if?(t?==?null)
??????firstWaiter?=?node;
??else
??????t.nextWaiter?=?node;
??//?修改lastWaiter的指向
??lastWaiter?=?node;
??return?node;
}
await()方法的邏輯也比較簡單,其執(zhí)行的主要流程如下
如果當(dāng)前線程已經(jīng)被中斷則拋出異常,未中斷則執(zhí)行步驟2 將當(dāng)前線程構(gòu)造為condition node,并插入到條件隊列中 如果condition node不在同步隊列中(有可能被喚醒后,移出條件隊列了),則調(diào)用unsafe.park阻塞當(dāng)前線程
通過以上代碼分析,await之后條件隊列會形成下面這樣的數(shù)據(jù)結(jié)構(gòu)
firstWaiter?-->?nextWaiter?-->?nextWaiter?-->?lastWaiter
這里的firsetWaiter并不是虛擬節(jié)點,當(dāng)只有一個線程在條件隊列中時,firstWaiter == node == lastWaiter
signal
public?final?void?signal()?{
??if?(!isHeldExclusively())
??????throw?new?IllegalMonitorStateException();
??//?獲取第一個等待節(jié)點,進行喚醒
??Node?first?=?firstWaiter;
??if?(first?!=?null)
??????doSignal(first);
}
private?void?doSignal(Node?first)?{
??do?{
??????//?將節(jié)點從條件隊列中移除
??????if?(?(firstWaiter?=?first.nextWaiter)?==?null)
??????????lastWaiter?=?null;
??????first.nextWaiter?=?null;
??}?while?(!transferForSignal(first)?&&
???????????(first?=?firstWaiter)?!=?null);
}
//?將節(jié)點從條件隊列移動到同步隊列
final?boolean?transferForSignal(Node?node)?{
?
??//?CAS修改waitStatus失敗,就證明node處于CANCELLED狀態(tài)
??if?(!compareAndSetWaitStatus(node,?Node.CONDITION,?0))
??????return?false;
??//?enq的源碼上面上面獲取鎖有提到過,就是將節(jié)點插入同步隊列并返回前置節(jié)點
??Node?p?=?enq(node);
??int?ws?=?p.waitStatus;
??//?如果前置節(jié)點狀態(tài)是CANCELLED或者修改waitStatus狀態(tài)為SIGNAL失敗,那么需要喚醒剛插入的節(jié)點
??if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
??????LockSupport.unpark(node.thread);
??return?true;
}
其實signal的邏輯也是很簡單的,不知道為什么寫到signal的時候,我就想起了我當(dāng)時寫synchronized的時候,一樣的索然無味,無欲無求。我想可能是沒人給我點贊。
說回signal,它的主要邏輯如下
將第一個等待節(jié)點從等待隊列中移除 修改第一個等待節(jié)點的waitStatus為0 將節(jié)點入隊到同步隊列,并返回前一個節(jié)點 判斷前一個節(jié)點是否處于CANCELLED或者waitStatus能否正常修改,如果不能則解鎖剛?cè)腙牭墓?jié)點
你需要注意的是,signal并沒有釋放自己獲得的鎖。釋放鎖的操作仍然是通過release的。
總結(jié)
通過源碼的分析,我相信你一定可以回答上之前的三個問題。也讓我們知道了AQS和Synchronized其實內(nèi)在實現(xiàn)方式是很類似的。
AQS中有同步隊列和條件隊列,signal的時候是將節(jié)點從條件隊列轉(zhuǎn)移到同步隊列。Synchronized中有EntryList和WaitSet。notify的時候?qū)⒕€程從WaitSet移動到EntryList中。
同樣的,看源碼我個人更喜歡沿著一條線去看,而不是大而全的看,這樣更容易把控整個脈絡(luò)。
你的關(guān)注是對我最大的支持
老哥點個贊,順便關(guān)注下唄
