1. 同步組件Semaphore源碼解析

        共 9077字,需瀏覽 19分鐘

         ·

        2023-06-25 22:36

        走過路過不要錯(cuò)過

        點(diǎn)擊藍(lán)字關(guān)注我們


        Semaphore概述及案例學(xué)習(xí)

        Semaphore信號(hào)量用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理地使用公共資源。

        public class SemaphoreTest {

        private static final int THREAD_COUNT = 30;
        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

        private static Semaphore s = new Semaphore(10); //10個(gè)許可證數(shù)量,最大并發(fā)數(shù)為10

        public static void main(String[] args) {
        for(int i = 0; i < THREAD_COUNT; i ++){ //執(zhí)行30個(gè)線程
        threadPool.execute(new Runnable() {
        @Override
        public void run() {
        s.tryAcquire(); //嘗試獲取一個(gè)許可證
        System.out.println("save data");
        s.release(); //使用完之后歸還許可證
        }
        });
        }
        threadPool.shutdown();
        }
        }


        • 創(chuàng)建一個(gè)大小為30的線程池,但是信號(hào)量規(guī)定在10,保證許可證數(shù)量為10。

        • 每次線程調(diào)用tryAcquire()或者acquire()方法都會(huì)原子性的遞減許可證的數(shù)量,release()會(huì)原子性遞增許可證數(shù)量。

        類圖結(jié)構(gòu)及重要字段

        public class Semaphore implements java.io.Serializable {
        private static final long serialVersionUID = -3222578661600680210L;
        /** All mechanics via AbstractQueuedSynchronizer subclass */
        private final Sync sync;

        abstract static class Sync extends AbstractQueuedSynchronizer {
        // permits指定初始化信號(hào)量個(gè)數(shù)
        Sync(int permits) {
        setState(permits);
        }
        // ...
        }

        static final class NonfairSync extends Sync {...}

        static final class FairSync extends Sync {...}

        // 默認(rèn)采用非公平策略
        public Semaphore(int permits) {
        sync = new NonfairSync(permits);
        }

        // 可以指定公平策略
        public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }

        //...
        }


        • 基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實(shí)現(xiàn)。

        • 類似于CountDownLatch,state在這里也是通過構(gòu)造器指定,表示初始化信號(hào)量的個(gè)數(shù)。

        本篇文章閱讀需要建立在一定的AQS基礎(chǔ)之上,這邊推薦幾篇前置文章,可以瞅一眼:

        void acquire()

        調(diào)用該方法時(shí),表示希望獲取一個(gè)信號(hào)量資源,相當(dāng)于acquire(1)

        如果當(dāng)前信號(hào)量個(gè)數(shù)大于0,CAS將當(dāng)前信號(hào)量值減1,成功后直接返回。

        如果當(dāng)前信號(hào)量個(gè)數(shù)等于0,則當(dāng)前線程將被置入AQS的阻塞隊(duì)列。

        該方法是響應(yīng)中斷的,其他線程調(diào)用了該線程的interrupt()方法,將會(huì)拋出中斷異常返回。

            // Semaphore.java
        public void acquire() throws InterruptedException {
        // 傳遞的 arg 為 1 , 獲取1個(gè)信號(hào)量資源
        sync.acquireSharedInterruptibly(1);
        }
        // AQS.java
        public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 線程被 中斷, 拋出中斷異常
        if (Thread.interrupted())
        throw new InterruptedException();
        // 子類實(shí)現(xiàn), 公平和非公平兩種策略
        if (tryAcquireShared(arg) < 0)
        // 如果獲取失敗, 則置入阻塞隊(duì)列,
        // 再次進(jìn)行嘗試, 嘗試失敗則掛起當(dāng)前線程
        doAcquireSharedInterruptibly(arg);
        }


        非公平

            static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
        super(permits);
        }

        protected int tryAcquireShared(int acquires) {
        // 這里直接調(diào)用Sync定義的 非公平共享模式獲取方法
        return nonfairTryAcquireShared(acquires);
        }
        }

        abstract static class Sync extends AbstractQueuedSynchronizer {

        final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
        // 獲取當(dāng)前信號(hào)量的值
        int available = getState();
        // 減去需要獲取的值, 得到剩余的信號(hào)量個(gè)數(shù)
        int remaining = available - acquires;
        // 不剩了,表示當(dāng)前信號(hào)量個(gè)數(shù)不能滿足需求, 返回負(fù)數(shù), 線程置入AQS阻塞
        // 還有的剩, CAS設(shè)置當(dāng)前信號(hào)量值為剩余值, 并返回剩余值
        if (remaining < 0 ||
        compareAndSetState(available, remaining))
        return remaining;
        }
        }
        }


        你會(huì)發(fā)現(xiàn),非公平策略是無法保證【AQS隊(duì)列中阻塞的線程】和【當(dāng)前線程】獲取的順序的,當(dāng)前線程是有可能在排隊(duì)的線程之前就拿到資源,產(chǎn)生插隊(duì)現(xiàn)象。

        公平策略就不一樣了,它會(huì)通過hasQueuedPredecessors()方法看看隊(duì)列中是否存在前驅(qū)節(jié)點(diǎn),以保證公平性。

        公平策略

            static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
        super(permits);
        }

        protected int tryAcquireShared(int acquires) {
        for (;;) {
        // 如果隊(duì)列中在此之前已經(jīng)有線程在排隊(duì)了,直接放棄獲取
        if (hasQueuedPredecessors())
        return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
        compareAndSetState(available, remaining))
        return remaining;
        }
        }
        }


        void acquire(int permits)

        在acquire()的基礎(chǔ)上,指定了獲取信號(hào)量的數(shù)量permits。

            public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
        }


        void acquireUninterruptibly()

        該方法與acquire()類似,但是不響應(yīng)中斷。

            public void acquireUninterruptibly() {
        sync.acquireShared(1);
        }

        public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
        }


        void acquireUninterruptibly(int permits)

        該方法與acquire(permits)類似,但是不響應(yīng)中斷。

            public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
        }


        boolean tryAcquire()

        tryAcquire和acquire非公平策略公用一個(gè)邏輯,但是區(qū)別在于,如果獲取信號(hào)量失敗,或者CAS失敗,將會(huì)直接返回false,而不會(huì)置入阻塞隊(duì)列中。

        一般try開頭的方法的特點(diǎn)就是這樣,嘗試一下,成功是最好,失敗也不至于被阻塞,而是立刻返回false。

            public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
        }
        abstract static class Sync extends AbstractQueuedSynchronizer {
        final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
        compareAndSetState(available, remaining))
        return remaining;
        }
        }
        }


        boolean tryAcquire(int permits)

        相比于普通的tryAcquire(),指定了permits的值。

            public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
        }


        boolean tryAcquire(int permits, long timeout, TimeUnit unit)

        相比于tryAcquire(int permits),增加了超時(shí)控制。

            public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
        }


        void release()

        將信號(hào)量值加1,如果有線程因?yàn)檎{(diào)用acquire方法而被阻塞在AQS阻塞隊(duì)列中,將根據(jù)公平策略選擇一個(gè)信號(hào)量個(gè)數(shù)滿足需求的線程喚醒,線程喚醒后也會(huì)嘗試獲取新增的信號(hào)量。

        參考文章:Java并發(fā)包源碼學(xué)習(xí)系列:AQS共享模式獲取與釋放資源

            // Semaphore.java
        public void release() {
        sync.releaseShared(1);
        }
        // AQS.java
        public final boolean releaseShared(int arg) {
        // 嘗試釋放鎖
        if (tryReleaseShared(arg)) {
        // 釋放成功, 喚醒AQS隊(duì)列里面最先掛起的線程
        // https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838
        doReleaseShared();
        return true;
        }
        return false;
        }
        // Semaphore#Sync.java
        abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryReleaseShared(int releases) {
        for (;;) {
        // 獲取當(dāng)前信號(hào)量
        int current = getState();
        // 期望加上releases
        int next = current + releases;
        if (next < current) // overflow
        throw new Error("Maximum permit count exceeded");
        // CAS操作,更新
        if (compareAndSetState(current, next))
        return true;
        }
        }
        }


        void release(int permits)

        release()相比指定了permits的值。

            public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
        }


        其他方法

        Semaphore還提供其他一些方法,實(shí)現(xiàn)比較簡單,這邊就簡單寫一下吧:

            // 返回此信號(hào)量中當(dāng)前可用的許可證數(shù)量, 其實(shí)就是得到當(dāng)前的 state值  getState()
        public int availablePermits() {
        return sync.getPermits();
        }

        // 將state更新為0, 返回0
        public int drainPermits() {
        return sync.drainPermits();
        }

        // 減少reduction個(gè)許可證
        protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
        }

        // 判斷公平策略
        public boolean isFair() {
        return sync instanceof FairSync;
        }

        // 判斷是否有線程證在等待獲取許可證
        public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
        }

        // 返回正在等待獲取許可證的線程數(shù)
        public final int getQueueLength() {
        return sync.getQueueLength();
        }

        // 返回所有等待獲取許可證的線程集合
        protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
        }


        總結(jié)

        Semaphore信號(hào)量用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理地使用公共資源。

        • 基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實(shí)現(xiàn)。

        • 類似于CountDownLatch,state在這里也是通過構(gòu)造器指定,表示初始化信號(hào)量的個(gè)數(shù)。

        每次線程調(diào)用tryAcquire()或者acquire()方法都會(huì)原子性的遞減許可證的數(shù)量,release()會(huì)原子性遞增許可證數(shù)量,只要有許可證就可以重復(fù)使用




        想進(jìn)大廠的小伙伴請(qǐng)注意,

        大廠面試的套路很神奇,

        早做準(zhǔn)備對(duì)大家更有好處,

        埋頭刷題效率低,

        看面經(jīng)會(huì)更有效率!

        小編準(zhǔn)備了一份大廠常問面經(jīng)匯總集

        剩下的就不會(huì)給大家一展出來了,以上資料按照一下操作即可獲得

        ——將文章進(jìn)行轉(zhuǎn)發(fā)評(píng)論關(guān)注公眾號(hào)【Java烤豬皮】,關(guān)注后繼續(xù)后臺(tái)回復(fù)領(lǐng)取口令“ 666 ”即可免費(fèi)領(lǐng)文章取中所提供的資料。




        往期精品推薦



        騰訊、阿里、滴滴后臺(tái)試題匯集總結(jié) — (含答案)

        面試:史上最全多線程序面試題!

        最新阿里內(nèi)推Java后端試題

        JVM難學(xué)?那是因?yàn)槟銢]有真正看完整這篇文章


        結(jié)束


        關(guān)注作者微信公眾號(hào) — 《JAVA烤豬皮》


        了解了更多java后端架構(gòu)知識(shí)以及最新面試寶典



        看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者來源不斷出文的動(dòng)力~

        瀏覽 54
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 夜夜夜夜夜撸 | 在线无限看黄 免费无码 | 亚洲无码影片 | 潘金莲和武松做爰h | 午夜久久网站 |