1. 面試官:從源碼角度講講ReentrantLock及隊(duì)列同步器(AQS)

        共 34825字,需瀏覽 70分鐘

         ·

        2021-08-03 18:49

        你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

        你來(lái),我們一起精進(jìn)!你不來(lái),我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!

        編輯:業(yè)余草

        blog.csdn.net/fuzhongmin05

        推薦:https://www.xttblog.com/?p=5178

        JDK 中獨(dú)占鎖(排他鎖)的實(shí)現(xiàn)除了使用關(guān)鍵字 synchronized 外,還可以使用ReentrantLock。雖然在性能上 ReentrantLock 和 synchronized 沒(méi)有什么大區(qū)別,但 ReentrantLock 相比 synchronized 而言功能更加豐富,使用起來(lái)更為靈活,也更適合復(fù)雜的并發(fā)場(chǎng)景。

        ReentrantLock 常常對(duì)比著 synchronized 來(lái)分析,我們先對(duì)比著來(lái)看然后再一點(diǎn)一點(diǎn)分析。

        1. synchronized 是獨(dú)占鎖,加鎖和解鎖的過(guò)程自動(dòng)進(jìn)行,易于操作,但不夠靈活。ReentrantLock 也是獨(dú)占鎖,加鎖和解鎖的過(guò)程需要手動(dòng)進(jìn)行,不易操作,但非常靈活。
        2. synchronized 可重入,因?yàn)榧渔i和解鎖自動(dòng)進(jìn)行,不必?fù)?dān)心最后是否釋放鎖;ReentrantLock 也可重入,但加鎖和解鎖需要手動(dòng)進(jìn)行,且次數(shù)需一樣,否則其他線程無(wú)法獲得鎖。
        3. synchronized 不可響應(yīng)中斷,一個(gè)線程獲取不到鎖就一直等著;ReentrantLock 可以相應(yīng)中斷。

        ReentrantLock 好像比 synchronized 關(guān)鍵字沒(méi)好太多,我們?cè)偃タ纯?synchronized 所沒(méi)有的。一個(gè)最主要的就是 ReentrantLock 還可以實(shí)現(xiàn)公平鎖機(jī)制。什么叫公平鎖呢?也就是在鎖上等待時(shí)間最長(zhǎng)的線程將獲得鎖的使用權(quán)。通俗的理解就是誰(shuí)排隊(duì)時(shí)間最長(zhǎng)誰(shuí)先執(zhí)行獲取鎖。

        在講解 ReentrantLock 之前,必須先要了解一個(gè)叫 AQS(隊(duì)列同步器)的東西,這個(gè)東西也是并發(fā)包的,AQS 就是AbstractQueuedSynchronizer,它是 Java 并發(fā)包中的一個(gè)核心,ReentrantLock 以及ReentrantReadWriteLock都是基于 AQS 實(shí)現(xiàn)的。想要搞明白 ReentrantLock 是如何實(shí)現(xiàn)的,就必須先學(xué)習(xí)AbstractQueuedSynchronizer。本文結(jié)合 JDK1.8 的源碼先剖析下隊(duì)列同步器,然后再詳解 ReentrantLock 中如何應(yīng)用隊(duì)列同步器實(shí)現(xiàn)獨(dú)占鎖的。

        隊(duì)列同步器(AQS)

        隊(duì)列同步器 AbstractQueuedSynchronizer,是用來(lái)構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用了一個(gè) volatile 修飾的 int 成員變量表示同步狀態(tài),通過(guò)內(nèi)置的 FIFO 隊(duì)列來(lái)完成資源獲取線程的排隊(duì)工作。

        同步器的主要使用方式是繼承,子類通過(guò)繼承同步器并實(shí)現(xiàn)它的抽象方法來(lái)管理同步狀態(tài),在抽象方法的實(shí)現(xiàn)過(guò)程中免不了要對(duì)同步狀態(tài)進(jìn)行更改,這時(shí)就需要同步器提供的 3 個(gè)方法getState()、setState(int newState)compareAndSetState(int expect, int update)來(lái)進(jìn)行操作,因?yàn)樗鼈兡軌虮WC狀態(tài)的改變是安全的。子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器自身沒(méi)有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放方法來(lái)供自定義同步組件使用,同步器即可以支持獨(dú)占式獲取同步狀態(tài),也可以支持共享式地獲取同步狀態(tài),這樣方便實(shí)現(xiàn)不同類型的同步組件(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等)。

        AQS 是實(shí)現(xiàn)鎖(也可以是任何同步組件)的關(guān)鍵:在鎖中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語(yǔ)義。兩者的關(guān)系:鎖是面向使用者的,他定義了使用者與鎖交互的接口(比如允許兩個(gè)線程并行訪問(wèn)),隱藏了實(shí)現(xiàn)細(xì)節(jié);同步器是面向鎖的實(shí)現(xiàn)者,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式,屏蔽了同步管理狀態(tài)、線程的排隊(duì)、等待與喚醒等底層操作。

        隊(duì)列同步器的接口及模板方法

        同步器基于模板設(shè)計(jì)模式實(shí)現(xiàn)的,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義的同步組件的實(shí)現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法會(huì)調(diào)用使用者重寫的方法。

        重寫同步器指定方法時(shí)需要使用同步器提供的如下三個(gè)方法來(lái)訪問(wèn)或修改同步狀態(tài):

        • getState():獲取當(dāng)前同步狀態(tài)
        • setState(int new State):設(shè)置當(dāng)前同步狀態(tài)
        • compareAndState(int expect,int update):使用 CAS 設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原子性。

        同步器可重寫的方法:

        實(shí)現(xiàn)自定義同步組件時(shí),將會(huì)調(diào)用AbstractQueuedSynchronizer自身已經(jīng)寫好的模板方法,這些模板方法與描述:

        同步器可重寫的方法

        注:模板方法基本分為三類:獨(dú)占式同步狀態(tài)獲取與釋放、共享式同步狀態(tài)獲取與釋放和查詢同步隊(duì)列中等待線程情況。

        AbstractQueuedSynchronizer要求子類必須覆寫的方法如下,之所以要求子類重寫這些方法,是為了讓使用者可以在其中加入自己的判斷邏輯。

        同步器提供的模版方法

        AQS 中提供了很多關(guān)于鎖的實(shí)現(xiàn)方法

        • getState():獲取鎖的標(biāo)志 state 值
        • setState():設(shè)置鎖的標(biāo)志 state 值
        • tryAcquire(int):獨(dú)占方式獲取鎖。嘗試獲取資源,成功則返回 true,失敗則返回 false
        • tryRelease(int):獨(dú)占方式釋放鎖。嘗試釋放資源,成功則返回 true,失敗則返回 false

        隊(duì)列同步器數(shù)據(jù)結(jié)構(gòu)

        同步器依賴于內(nèi)部的同步隊(duì)列(一個(gè)FIFO雙向隊(duì)列)來(lái)完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí),同步器會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)成一個(gè)節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列,同時(shí)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會(huì)將首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

        AbstractQueuedSynchronizer類中,Node 類是靜態(tài)內(nèi)部類,其源碼如下:

        static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;

            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */

            static final int PROPAGATE = -3;
            //等待狀態(tài)
            volatile int waitStatus;
            //指向前一個(gè)結(jié)點(diǎn)的指針
            volatile Node prev;
            //指向后一個(gè)結(jié)點(diǎn)的指針
            volatile Node next;
            //當(dāng)前結(jié)點(diǎn)代表的線程
            volatile Thread thread;
            //等待隊(duì)列中的后繼節(jié)點(diǎn),如果當(dāng)前節(jié)點(diǎn)是共享的,那么這個(gè)字段將是SHARED常量,即節(jié)點(diǎn)類型(獨(dú)占和共享)和等待隊(duì)列中的后繼節(jié)點(diǎn)共用同一個(gè)字段
            Node nextWaiter;

            /**
             * Returns true if node is waiting in shared mode.
             */

            final boolean isShared() {
                return nextWaiter == SHARED;
            }


            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }

            Node() {    // Used to establish initial head or SHARED marker
            }

            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }

            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

        同時(shí),在AbstractQueuedSynchronizer類中,又單獨(dú)定義了隊(duì)列頭結(jié)點(diǎn)、尾結(jié)點(diǎn)、同步狀態(tài)變量。

        //指向隊(duì)列頭結(jié)點(diǎn)
        private transient volatile Node head;
        //指向隊(duì)列尾結(jié)點(diǎn)
        private transient volatile Node tail;
        //同步狀態(tài)變量
        private volatile int state;
        protected final int getState() {
            return state;
        }
        protected final void setState(int newState) {
            state = newState;
        }
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

        同步隊(duì)列的結(jié)構(gòu)如圖:

        同步隊(duì)列的結(jié)構(gòu)

        為了接下來(lái)能夠更好的理解加鎖和解鎖過(guò)程的源碼,對(duì)該同步隊(duì)列的特性進(jìn)行簡(jiǎn)單的說(shuō)明:

        1. 同步隊(duì)列是個(gè)先進(jìn)先出(FIFO)隊(duì)列,獲取鎖失敗的線程將構(gòu)造結(jié)點(diǎn)并加入隊(duì)列的尾部,加入隊(duì)列的過(guò)程必須保證線程安全,為什么必須保證線程安全?因?yàn)橐鎸?duì)同時(shí)有多條線程沒(méi)有獲取到同步狀態(tài)要加入同步隊(duì)列尾部的情況;
        2. 隊(duì)列首結(jié)點(diǎn)是獲取同步狀態(tài)成功的線程節(jié)點(diǎn);
        3. 前驅(qū)結(jié)點(diǎn)線程釋放鎖后將嘗試喚醒后繼結(jié)點(diǎn)中處于阻塞狀態(tài)的線程

        同步器將節(jié)點(diǎn)加入同步隊(duì)列的過(guò)程:

        同步器將節(jié)點(diǎn)加入同步隊(duì)列的過(guò)程

        同步隊(duì)列遵循 FIFO,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為新的首節(jié)點(diǎn)。

        同步隊(duì)列遵循 FIFO

        注:設(shè)置首節(jié)點(diǎn)是通過(guò)由已經(jīng)獲取到了同步狀態(tài)的線程來(lái)完成的,由于只有一個(gè)線程能夠獲取到同步狀態(tài),因此設(shè)置頭節(jié)點(diǎn)的方法并不需要 CAS 來(lái)保障,它只需要讓head 指針指向原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開(kāi)原首節(jié)點(diǎn)的 next 引用即可。

        隊(duì)列同步器提供的獨(dú)占式同步狀態(tài)獲取方法

        通過(guò)AbstractQueuedSynchronizer類提供的acquire(int arg)方法可以獲取同步狀態(tài),該方法對(duì)中斷不敏感,也就是說(shuō)由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊(duì)列中,后繼對(duì)線程進(jìn)行中斷操作時(shí),線程不會(huì)從同步隊(duì)列移除。acquire 方法:

        public final void acquire(int arg) {
            //tryAcquie()方法具體要交給子類去實(shí)現(xiàn),AbstractQueuedSynchronizer類中不實(shí)現(xiàn)
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

        上述代碼中完成了同步狀態(tài)的獲取、節(jié)點(diǎn)構(gòu)造、加入同步隊(duì)列以及同步隊(duì)列中自旋等待的相關(guān)工作。

        首先調(diào)用自定義同步器(AbstractQueuedSynchronizer的子類)實(shí)現(xiàn)的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態(tài),如果同步狀態(tài)獲取失敗,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式Node.EXCLUSIVE,同一時(shí)刻只能有一個(gè)線程成功獲取同步狀態(tài))并通過(guò)addWaiter(Node node)方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部,最后調(diào)用acquireQueued(Node node,int arg)方法,使得該節(jié)點(diǎn)以“死循環(huán)”的方式獲取同步狀態(tài)。如果獲取不到就阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來(lái)實(shí)現(xiàn)。

        節(jié)點(diǎn)的構(gòu)造以及加入同步隊(duì)列依靠于 addWaiter 和 enq 方法:

        private Node addWaiter(Node mode) {
         首先創(chuàng)建一個(gè)新節(jié)點(diǎn),并將當(dāng)前線程實(shí)例封裝在內(nèi)部,mode這里為null
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            ///入隊(duì)的邏輯
            enq(node);
            return node;
        }

        /**
         * 隊(duì)列不空時(shí)向尾部添加結(jié)點(diǎn)的邏輯在enq(node)方法中也有,之所以會(huì)有這部分“重復(fù)代碼”是對(duì)某些特殊情況進(jìn)行提前處理,犧牲一定的代碼可讀性換取性能提升。
         */

        private Node enq(final Node node) {
            for (;;) {
             //t指向當(dāng)前隊(duì)列的最后一個(gè)節(jié)點(diǎn),隊(duì)列為空則為null
                Node t = tail;
                //隊(duì)列為空
                if (t == null) { 
                //此時(shí)鏈表沒(méi)有節(jié)點(diǎn),需要初始化讓head跟tail都指向一個(gè)哨兵節(jié)點(diǎn)
                //構(gòu)造新結(jié)點(diǎn),CAS方式設(shè)置為隊(duì)列首元素,當(dāng)head==null時(shí)更新成功
                    if (compareAndSetHead(new Node())) 
                        tail = head;//尾指針指向首結(jié)點(diǎn)
                } else {  //隊(duì)列不為空
                    node.prev = t;
                    if (compareAndSetTail(t, node)) { //CAS將尾指針指向當(dāng)前結(jié)點(diǎn),當(dāng)t(原來(lái)的尾指針)==tail(當(dāng)前真實(shí)的尾指針)時(shí)執(zhí)行成功
                        t.next = node;    //原尾結(jié)點(diǎn)的next指針指向當(dāng)前結(jié)點(diǎn)
                        return t;
                    }
                }
            }
        }

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                //死循環(huán),正常情況下線程只有獲得鎖才能跳出循環(huán)
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                     //將當(dāng)前結(jié)點(diǎn)設(shè)置為隊(duì)列頭結(jié)點(diǎn)
                        setHead(node);
                        p.next = null// help GC
                        failed = false;
                        //正常情況下死循環(huán)唯一的出口
                        return interrupted;
                    }
                    //判斷是否要阻塞當(dāng)前線程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

        看完enq(final Node node)方法后,發(fā)現(xiàn)事實(shí)上 AQS 隊(duì)列的頭節(jié)點(diǎn)其實(shí)是個(gè)哨兵節(jié)點(diǎn)。

        enq(final Node node)中,同步器通過(guò)死循環(huán)的方式來(lái)確保節(jié)點(diǎn)的添加,在死循環(huán)中只有通過(guò) CAS 將當(dāng)前節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后,當(dāng)前線程才能從該方法返回,否則的話當(dāng)前線程不斷地嘗試設(shè)置。enq(final Node node)方法將并發(fā)添加節(jié)點(diǎn)的請(qǐng)求通過(guò) CAS 變得“串行化”了。循環(huán)加 CAS 操作是實(shí)現(xiàn)樂(lè)觀鎖的標(biāo)準(zhǔn)方式,CAS 是為了實(shí)現(xiàn)原子操作而出現(xiàn)的,所謂的原子操作指操作執(zhí)行期間,不會(huì)受其他線程的干擾。Java 實(shí)現(xiàn)的 CAS 是調(diào)用 unsafe 類提供的方法,底層是調(diào)用 C++ 方法,直接操作內(nèi)存,在 CPU 層面加鎖,直接對(duì)內(nèi)存進(jìn)行操作。

        來(lái)看shouldParkAfterFailedAcquire(Node pred, Node node),從方法名上我們可以大概猜出這是判斷是否要阻塞當(dāng)前線程的,源碼如下:

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL) //狀態(tài)為SIGNAL

                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */

                return true;
            if (ws > 0) { //狀態(tài)為CANCELLED,
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */

                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else { //狀態(tài)為初始化狀態(tài)(ReentrentLock語(yǔ)境下)
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */

                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

        可以看到針對(duì)前驅(qū)結(jié)點(diǎn) pred 的狀態(tài)會(huì)進(jìn)行不同的處理:

        1. pred 狀態(tài)為 SIGNAL,則返回 true,表示要阻塞當(dāng)前線程;
        2. pred 狀態(tài)為 CANCELLED,則一直往隊(duì)列頭部回溯直到找到一個(gè)狀態(tài)不為CANCELLED 的結(jié)點(diǎn),將當(dāng)前節(jié)點(diǎn) node 掛在這個(gè)結(jié)點(diǎn)的后面;
        3. pred 的狀態(tài)為初始化狀態(tài),此時(shí)通過(guò) CAS 操作將 pred 的狀態(tài)改為 SIGNAL

        其實(shí)這個(gè)方法的含義很簡(jiǎn)單,就是確保當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)的狀態(tài)為 SIGNAL,SIGNAL 意味著線程釋放鎖后會(huì)喚醒后面阻塞的線程。畢竟,只有確保能夠被喚醒,當(dāng)前線程才能放心的阻塞。

        要注意只有在前驅(qū)結(jié)點(diǎn)已經(jīng)是 SIGNAL 狀態(tài)后才會(huì)執(zhí)行后面的方法立即阻塞,對(duì)應(yīng)上面的第一種情況。其他兩種情況則因?yàn)榉祷?false 而重新執(zhí)行一遍

        acquireQueued()方法的源碼表明節(jié)點(diǎn)在進(jìn)入隊(duì)列后,就進(jìn)入了一個(gè)自旋狀態(tài),每個(gè)節(jié)點(diǎn)(或者說(shuō)每個(gè)線程),都在自省觀察,當(dāng)條件滿足,獲取到同步狀態(tài),就可以從這個(gè)自旋過(guò)程中退出,否則依舊留在自旋過(guò)程中,

        這個(gè)過(guò)程如下圖所示。

        自旋狀態(tài)

        acquireQueued(final Node node, int arg)方法中,線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)時(shí)才能夠嘗試獲取同步狀態(tài),原因如下:

        1. 頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn),而頭節(jié)點(diǎn)線程獲取到同步狀態(tài)后,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程在被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn);
        2. 維護(hù)同步隊(duì)列的 FIFO 原則

        可以看到節(jié)點(diǎn)與及節(jié)點(diǎn)之間在循環(huán)檢查的過(guò)程中基本上不相互通信,而是簡(jiǎn)單地判斷自己的前驅(qū)是否為頭節(jié)點(diǎn),這樣就使得節(jié)點(diǎn)的釋放符合 FIFO,并且對(duì)于方便對(duì)過(guò)早通知進(jìn)行處理。

        獨(dú)占式同步狀態(tài)獲取流程如下圖,也就是acquire(int arg)方法的執(zhí)行流程:

        獨(dú)占式同步狀態(tài)獲取流程

        當(dāng)同步狀態(tài)獲取成功,當(dāng)前線程從acquire(int arg)方法返回,這也就代表著當(dāng)前線程獲得了鎖。

        隊(duì)列同步器提供的獨(dú)占式同步狀態(tài)釋放方法

        釋放同步狀態(tài),通過(guò)調(diào)用同步器的release(int arg)方法可以釋放同步狀態(tài),該方法在釋放了同步狀態(tài)后,會(huì)喚醒其后繼節(jié)點(diǎn)(進(jìn)而使后繼節(jié)點(diǎn)重新嘗試獲取同步狀態(tài))。解鎖的源碼相對(duì)簡(jiǎn)單,代碼如下:

        public final boolean release(int arg) {
            //tryRelease()方法具體要交給子類去實(shí)現(xiàn),AbstractQueuedSynchronizer類中不實(shí)現(xiàn)
            if (tryRelease(arg)) {
                Node h = head;
                //當(dāng)前隊(duì)列不為空且頭結(jié)點(diǎn)狀態(tài)不為初始化狀態(tài)(0)
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h); //喚醒同步隊(duì)列中被阻塞的線程
                return true;
            }
            return false;
        }

        若當(dāng)前線程已經(jīng)完全釋放鎖,即鎖可被其他線程使用,則還應(yīng)該喚醒后續(xù)等待線程。不過(guò)在此之前需要進(jìn)行兩個(gè)條件的判斷:

        • h != null是為了防止隊(duì)列為空,即沒(méi)有任何線程處于等待隊(duì)列中,那么也就不需要進(jìn)行喚醒的操作;
        • h.waitStatus != 0是為了防止隊(duì)列中雖有線程,但該線程還未阻塞,由前面的分析知,線程在阻塞自己前必須設(shè)置其前驅(qū)結(jié)點(diǎn)的狀態(tài)為 SIGNAL,否則它不會(huì)阻塞自己;

        該方法執(zhí)行時(shí),會(huì)喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccerssor(Node node)方法使用 LcokSupport 來(lái)喚醒處于等待(阻塞)狀態(tài)的線程。

        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);

            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                //從尾部向頭部遍歷
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

        一般情況下只要喚醒后繼結(jié)點(diǎn)的線程就行了,但是后繼結(jié)點(diǎn)可能已經(jīng)取消等待,所以從隊(duì)列尾部往前回溯,找到離頭結(jié)點(diǎn)最近的正常結(jié)點(diǎn),并喚醒其線程。

        獨(dú)占式同步狀態(tài)釋放流程如下圖,也就是release(int arg)方法的執(zhí)行流程:

        獨(dú)占式同步狀態(tài)釋放流程

        獨(dú)占式同步狀態(tài)獲取和釋放:

        • 在獲取同步狀態(tài)時(shí),同步器會(huì)維持一個(gè)同步隊(duì)列,獲取失敗的線程都會(huì)被加入到同步隊(duì)列中,并在同步隊(duì)列中自旋(判斷自己前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn))。
        • 移出隊(duì)列(停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。

        共享式同步狀態(tài)獲取與釋放

        共享式獲取與獨(dú)占式獲取的區(qū)別:同一時(shí)刻能否有多個(gè)線程同時(shí)獲取到同步狀態(tài)。

        以文件的讀寫為例:

        1. 如果有一個(gè)程序在讀文件,那么這一時(shí)刻的寫操作均被阻塞,而讀操作能夠同時(shí)進(jìn)行。
        2. 如果有一個(gè)程序在寫文件,那么這一時(shí)刻不管是其他的寫操作還是讀操作,均被阻塞。
        3. 寫操作要求對(duì)資源的獨(dú)占式訪問(wèn),而讀操作可以是共享式訪問(wèn)。

        調(diào)用同步器的acquireShared()模板方法,可以實(shí)現(xiàn)共享式獲取同步狀態(tài)。

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

        private void doAcquireShared(int arg) {
            //當(dāng)前節(jié)點(diǎn)加入同步隊(duì)列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
                    final Node p = node.predecessor();
                    //前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),就繼續(xù)嘗試獲取同步狀態(tài)
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null// help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    //用LockSupport的park方法把當(dāng)前線程阻塞住
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        • 當(dāng)前線程首先調(diào)用tryAcquireShared()這個(gè)被子類重寫的方法,共享式的獲取同步狀態(tài)。如果返回值大于等于 0,表示獲取成功并返回。
        • 如果返回值小于0表示獲取失敗,調(diào)用 doAcquireShared() 方法,讓線程進(jìn)入自旋狀態(tài)。
        • 自旋過(guò)程中,如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn),且調(diào)用tryAcquireShared()方法返回值大于等于 0,則退出自旋。否則,繼續(xù)進(jìn)行自旋。
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

        首先去嘗試釋放資源tryReleaseShared(arg),如果釋放成功了,就代表有資源空閑出來(lái),那么就用 doReleaseShared() 去喚醒后續(xù)結(jié)點(diǎn)。

        ReentrantLock 對(duì)隊(duì)列同步器的應(yīng)用

        syncronized 關(guān)鍵字隱式的支持重進(jìn)入,比如 syncronized 修飾一個(gè)遞歸方法,在方法執(zhí)行時(shí),執(zhí)行線程在獲取了鎖之后仍能連續(xù)多次地獲取該鎖。

        ReentrantLock 雖然沒(méi)能像 synchronized 關(guān)鍵字一樣支持隱式的重進(jìn)入,但是在調(diào)用 lock() 方法時(shí),已經(jīng)獲取了鎖的線程,能夠再次調(diào)用 lock() 方法而不被阻塞。要實(shí)現(xiàn)可重入的特性,就要解決以下兩個(gè)問(wèn)題:

        • 線程再次獲取鎖,鎖需要去識(shí)別獲取鎖的線程是否為當(dāng)前占據(jù)鎖的線程,如果是,則再次成功獲取
        • 鎖的最終釋放,線程重復(fù) n 次獲取了鎖,隨后在第 n 次釋放該鎖后,其他線程能夠獲取到鎖。鎖的最終釋放要求鎖對(duì)于獲取進(jìn)行自增,對(duì)于釋放進(jìn)行自減,當(dāng)計(jì)數(shù)等于0時(shí)表示鎖已經(jīng)成功釋放。

        ReentrantLock 通過(guò)組合自定義隊(duì)列同步器來(lái)實(shí)現(xiàn)鎖的可重入式獲取與釋放。

        ReentrantLock 的類圖如下,可以看出 ReentrantLock 實(shí)現(xiàn) Lock 接口,Sync 與 ReentrantLock 是組合關(guān)系,且 FairSync(公平鎖)、NonfairySync(非公平鎖)是 Sync 的子類。

        FairSync、NonfairySync、Sync

        公平鎖是 FairSync,非公平鎖是 NonfairSync。而不管是 FairSync 還是 NonfariSync,都間接繼承自 AbstractQueuedSynchronizer 這個(gè)抽象類,如下圖所示

        NonfariSync 的類圖

        ReentrantLock 非公平模式獲取及釋放鎖

        ReentrantLock 非公平模式下的獲取鎖的代碼如下:

        //實(shí)現(xiàn)Lock接口的lock方法,調(diào)用本方法當(dāng)前線程獲取鎖,拿鎖成功后就返回
        public void lock() {
         //非公平模式下,sync指向的對(duì)象類型是NonfairSync
            sync.lock();
        }
        //實(shí)現(xiàn)Lock接口的tryLock方法,嘗試非阻塞的獲取鎖,調(diào)用本方法后立刻返回,如果能獲取到鎖則返回true,否則返回false
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }

        靜態(tài)內(nèi)部類 NonfairSync 的源碼如下:

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;

            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */

            final void lock() {
             //首先CAS嘗試下獲取鎖,先假設(shè)每次lock都是非重入
                if (compareAndSetState(01))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                 //這里調(diào)用的是父類AbstractQueuedSynchronizer的acquire()方法,
                 //而acquire()方法中又要調(diào)用交由子類去實(shí)現(xiàn)的tryAcquiretryAcquire()方法
                 //所以會(huì)調(diào)到下面NonfairSync類的tryAcquire()方法
                    acquire(1);
            }

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }

        nonfairTryAcquire()方法的源碼如下:

        //本方法寫在Sync類中,而不是FairSync類中
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
             //如果當(dāng)前鎖閑置,就CAS嘗試下獲取鎖
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0// overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        nonfairTryAcquire()方法增加了再次獲取同步狀態(tài)的處理邏輯:通過(guò)判斷當(dāng)前線程是否為已經(jīng)獲取了鎖的線程來(lái)決定獲取操作是否成功,如果是則將同步狀態(tài)值增加并返回 true,表示獲取同步狀態(tài)成功。

        ReentrantLock 類的 unlock 方法:

        public void unlock() {
         //這里調(diào)用的是父類AbstractQueuedSynchronizer的release()方法,
         //而release()方法中又要調(diào)用交由子類去實(shí)現(xiàn)的tryRelease()方法
         //所以會(huì)調(diào)到Sync類的tryRelease()方法
            sync.release(1);
        }

        由于公平鎖與非公平鎖的差異主要體現(xiàn)在獲取鎖上,因此 tryAcquire() 方法由NonfairSync 類與 FairSync 類分別去實(shí)現(xiàn),而無(wú)論是公平鎖還是非公平鎖,鎖的釋放過(guò)程都是一樣的,因此 tryRelease() 方法由 Sync 類來(lái)實(shí)現(xiàn)。源碼如下:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        如果該鎖被獲取了 N 次,那么前 (N-1) 次tryRelease(int releases)方法必須返回 false,而只有同步狀態(tài)完全釋放了,才能返回 true。

        ReentrantLock 公平模式獲取及釋放鎖

        公平鎖模式下,對(duì)鎖的獲取有嚴(yán)格的條件限制。在同步隊(duì)列有線程等待的情況下,所有線程在獲取鎖前必須先加入同步隊(duì)列,隊(duì)列中的線程按加入隊(duì)列的先后次序獲得鎖。

        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;

            final void lock() {
                //這里調(diào)用的是父類AbstractQueuedSynchronizer的acquire()方法
                //父類的acquire()方法中會(huì)調(diào)用tryAcquire()方法,該方法由子類去實(shí)現(xiàn)
                //即最終會(huì)調(diào)用到FairSync實(shí)現(xiàn)的tryAcquire()方法
                acquire(1);
            }

            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                 //在真正CAS獲取鎖之前加了hasQueuedPredecessors()方法
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    //溢出了
                    if (nextc < 0
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }

        hasQueuedPredecessors()方法在AbstractQueuedSynchronizer類中(模板方法模式的典型應(yīng)用,所有公共的方法全部都由AbstractQueuedSynchronizer寫好,只有個(gè)性化邏輯才下沉給子類去實(shí)現(xiàn)),源碼如下:

        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }

        從方法名我們就可知道這是判斷隊(duì)列中是否有優(yōu)先級(jí)更高的等待線程,隊(duì)列中哪個(gè)線程優(yōu)先級(jí)最高?由于頭結(jié)點(diǎn)是當(dāng)前獲取鎖的線程,隊(duì)列中的第二個(gè)結(jié)點(diǎn)代表的線程優(yōu)先級(jí)最高。那么只要判斷隊(duì)列中第二個(gè)結(jié)點(diǎn)是否存在以及這個(gè)結(jié)點(diǎn)是否代表當(dāng)前線程就行了。這里分了兩種情況進(jìn)行探討:

        • 第二個(gè)結(jié)點(diǎn)已經(jīng)完全插入,但是這個(gè)結(jié)點(diǎn)是否就是當(dāng)前線程所在結(jié)點(diǎn)還未知,所以通過(guò)s.thread != Thread.currentThread()進(jìn)行判斷,如果為 true,說(shuō)明第二個(gè)結(jié)點(diǎn)代表其他線程。

        • 第二個(gè)結(jié)點(diǎn)并未完全插入,我們知道結(jié)點(diǎn)入隊(duì)一共分三步:

          1. 待插入結(jié)點(diǎn)的 pre 指針指向原尾結(jié)點(diǎn);
          2. CAS 更新尾指針;
          3. 原尾結(jié)點(diǎn)的 next 指針指向新插入結(jié)點(diǎn)。所以(s = h.next) == null就是用來(lái)判斷 2 剛執(zhí)行成功但還未執(zhí)行 3 這種情況的。這種情況第二個(gè)結(jié)點(diǎn)必然屬于其他線程。

        當(dāng)前有優(yōu)先級(jí)更高的線程在隊(duì)列中等待時(shí),那么當(dāng)前線程將不會(huì)執(zhí)行 CAS 操作去獲取鎖,保證了線程獲取鎖的順序與加入同步隊(duì)列的順序一致,很好的保證了公平性,但也增加了獲取鎖的成本。

        基于 FIFO 的同步隊(duì)列是怎樣實(shí)現(xiàn)非公平搶占鎖的

        由 FIFO 隊(duì)列的特性知,先加入同步隊(duì)列等待的線程會(huì)比后加入的線程更靠近隊(duì)列的頭部,那么它將比后者更早的被喚醒,它也就能更早的得到鎖。從這個(gè)意義上,對(duì)于在同步隊(duì)列中等待的線程而言,它們獲得鎖的順序和加入同步隊(duì)列的順序一致,這顯然是一種公平模式。然而,線程并非只有在加入隊(duì)列后才有機(jī)會(huì)獲得鎖,哪怕同步隊(duì)列中已有線程在等待,非公平鎖的不公平之處就在于此?;乜聪路枪芥i的加鎖流程,線程在進(jìn)入同步隊(duì)列等待之前有兩次搶占鎖的機(jī)會(huì):

        • 第一次是非重入式的獲取鎖,只有在當(dāng)前鎖未被任何線程占有(包括自身)時(shí)才能成功;
        • 第二次是在進(jìn)入同步隊(duì)列前,包含所有情況的獲取鎖的方式

        只有這兩次獲取鎖都失敗后,線程才會(huì)構(gòu)造結(jié)點(diǎn)并加入同步隊(duì)列等待。而線程釋放鎖時(shí)是先釋放鎖(修改 state 值),然后才喚醒后繼結(jié)點(diǎn)的線程的。試想下這種情況,線程 A 已經(jīng)釋放鎖,但還沒(méi)來(lái)得及喚醒后繼線程 C,而這時(shí)另一個(gè)線程 B 剛好嘗試獲取鎖,此時(shí)鎖恰好不被任何線程持有,它將成功獲取鎖而不用加入隊(duì)列等待。線程 C 此后才被喚醒嘗試獲取鎖,而此時(shí)鎖已經(jīng)被線程B搶占,故而其獲取失敗并繼續(xù)在隊(duì)列中等待。

        如果以線程第一次嘗試獲取鎖到最后成功獲取鎖的次序來(lái)看,非公平鎖確實(shí)很不公平。因?yàn)樵陉?duì)列中等待很久的線程相比還未進(jìn)入隊(duì)列等待的線程并沒(méi)有優(yōu)先權(quán),甚至競(jìng)爭(zhēng)也處于劣勢(shì):在隊(duì)列中的等待的線程要等待前驅(qū)結(jié)點(diǎn)線程的喚醒,在獲取鎖之前還要檢查自己的前驅(qū)結(jié)點(diǎn)是否為頭結(jié)點(diǎn)。在鎖競(jìng)爭(zhēng)激烈的情況下,在隊(duì)列中等待的線程可能遲遲競(jìng)爭(zhēng)不到鎖。這也就非公平在高并發(fā)情況下會(huì)出現(xiàn)的饑餓問(wèn)題。

        為什么非公平鎖性能好

        非公平鎖對(duì)鎖的競(jìng)爭(zhēng)是搶占式的(對(duì)于已經(jīng)處于等待隊(duì)列中線程除外),線程在進(jìn)入等待隊(duì)列之前可以進(jìn)行兩次嘗試,這大大增加了獲取鎖的機(jī)會(huì)。這種好處體現(xiàn)在兩個(gè)方面:

        • 線程不必加入等待隊(duì)列就可以獲得鎖,不僅免去了構(gòu)造結(jié)點(diǎn)并加入隊(duì)列的繁瑣操作,同時(shí)也節(jié)省了線程阻塞喚醒的開(kāi)銷,線程阻塞和喚醒涉及到線程上下文的切換和操作系統(tǒng)的系統(tǒng)調(diào)用,是非常耗時(shí)的。在高并發(fā)情況下,如果線程持有鎖的時(shí)間非常短,短到線程入隊(duì)阻塞的過(guò)程超過(guò)線程持有并釋放鎖的時(shí)間開(kāi)銷,那么這種搶占式特性對(duì)并發(fā)性能的提升會(huì)更加明顯;
        • 減少 CAS 競(jìng)爭(zhēng)。如果線程必須要加入阻塞隊(duì)列才能獲取鎖,那入隊(duì)時(shí) CAS 競(jìng)爭(zhēng)將變得異常激烈,CAS 操作雖然不會(huì)導(dǎo)致失敗線程掛起,但不斷失敗自旋導(dǎo)致的對(duì) CPU 的浪費(fèi)也不能忽視。

        AQS 在其他同步工具上的應(yīng)用

        在 ReentrantLock 的自定義同步器實(shí)現(xiàn)中,同步狀態(tài)表示鎖被一個(gè)線程重復(fù)獲取的次數(shù)。除了 ReentrantLock,AQS 也被大量應(yīng)用在其他同步工具上。

        ReentrantReadWriteLockReentrantReadWriteLock類使用 AQS 同步狀態(tài)中的低 16 位來(lái)保存寫鎖持有的次數(shù),高16位用來(lái)保存讀鎖的持有次數(shù)。由于 WriteLock 也是一種獨(dú)占鎖,因此其構(gòu)建方式同 ReentrantLock。ReadLock 則通過(guò)使用 acquireShared 方法來(lái)支持同時(shí)允許多個(gè)讀線程。

        AQS 同步狀態(tài)

        Semaphore:Semaphore類(信號(hào)量)使用 AQS 同步狀態(tài)來(lái)保存信號(hào)量的當(dāng)前計(jì)數(shù)。它里面定義的 acquireShared 方法會(huì)減少計(jì)數(shù),或當(dāng)計(jì)數(shù)為非正值時(shí)阻塞線程;tryRelease 方法會(huì)增加計(jì)數(shù),在計(jì)數(shù)為正值時(shí)還要解除線程的阻塞。

        CountDownLatch:CountDownLatch 類使用 AQS 同步狀態(tài)來(lái)表示計(jì)數(shù)。當(dāng)該計(jì)數(shù)為 0 時(shí),所有的 acquire 操作(對(duì)應(yīng)到CountDownLatch中就是 await 方法)才能通過(guò)。

        FutureTask:FutureTask 類使用 AQS 同步狀態(tài)來(lái)表示某個(gè)異步計(jì)算任務(wù)的運(yùn)行狀態(tài)(初始化、運(yùn)行中、被取消和完成)。設(shè)置(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一個(gè) FutureTask 時(shí)會(huì)調(diào)用 AQS 的 release 操作,等待計(jì)算結(jié)果的線程的阻塞解除是通過(guò) AQS 的 acquire 操作實(shí)現(xiàn)的。

        SynchronousQueues:SynchronousQueues 類使用了內(nèi)部的等待節(jié)點(diǎn),這些節(jié)點(diǎn)可以用于協(xié)調(diào)生產(chǎn)者和消費(fèi)者。同時(shí),它使用 AQS 同步狀態(tài)來(lái)控制當(dāng)某個(gè)消費(fèi)者消費(fèi)當(dāng)前一項(xiàng)時(shí),允許一個(gè)生產(chǎn)者繼續(xù)生產(chǎn),反之亦然。

        瀏覽 42
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 男人桶女人叽叽 | 黑人做爰巨大40cm | 成人亚洲精品一区二区三区嫩花 | 成人在线黄色视频 | 婷婷一区二区三区 |