1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Java多線程之Semaphore信號(hào)量

        共 5411字,需瀏覽 11分鐘

         ·

        2021-11-30 14:22

        這里就JUC包中的Semaphore類做相關(guān)介紹

        abstract.jpeg

        概述

        JUC包中的Semaphore信號(hào)量作為一個(gè)并發(fā)工具類。其基本思想很簡(jiǎn)單,對(duì)于一個(gè)信號(hào)量實(shí)例而言,其含有指定數(shù)量的許可。每當(dāng)訪問資源前,需先向其申請(qǐng)?jiān)S可。并在處理完畢后釋放許可,以供后續(xù)申請(qǐng)。其實(shí),這個(gè)使用方式就很像現(xiàn)實(shí)世界的停車場(chǎng),即停車場(chǎng)有空余車位,車才可以進(jìn)車;否則要么等待要么離開(尋找下一個(gè)停車場(chǎng))。當(dāng)車從停車場(chǎng)的車位駛離時(shí),則會(huì)將相應(yīng)的車位就會(huì)空余出來。在整個(gè)過程停車場(chǎng)的車位資源是有限的固定的。常見的使用場(chǎng)景是對(duì)業(yè)務(wù)所使用的線程數(shù)進(jìn)行控制,即所謂基于線程數(shù)的限流方式。其常用方法及功能如下所示

        //?創(chuàng)建一個(gè)指定許可數(shù)的非公平信號(hào)量
        public?Semaphore(int?permits);

        //?創(chuàng)建一個(gè)指定許可數(shù)的公平/非公平信號(hào)量
        public?Semaphore(int?permits,?boolean?fair);

        //?釋放一個(gè)許可
        public?void?release();

        //?釋放指定數(shù)量的許可
        public?void?release(int?permits);

        //?當(dāng)前剩余可用的許可數(shù)量
        public?int?availablePermits();

        /***************************?獲取許可?******************************/

        //?阻塞等待,直到獲取一個(gè)許可
        public?void?acquire()?throws?InterruptedException;

        //?阻塞等待,直到獲取全部所需數(shù)量的許可
        public?void?acquire(int?permits)?throws?InterruptedException;

        //?阻塞等待(忽略InterruptedException異常),直到獲取一個(gè)許可
        public?void?acquireUninterruptibly();

        //?阻塞等待(忽略InterruptedException異常),直到獲取全部所需數(shù)量的許可
        public?void?acquireUninterruptibly(int?permits);

        //?非阻塞式獲取一個(gè)許可,?ture:?獲取成功;?false:?獲取失敗
        public?boolean?tryAcquire();

        //?非阻塞式獲取全部所需數(shù)量的許可,?ture:?獲取成功;?false:?獲取失敗
        public?boolean?tryAcquire(int?permits);

        //?支持超時(shí)機(jī)制的tryAcquire方法,?獲取一個(gè)許可,?ture:?獲取成功;?false:?獲取失敗
        public?boolean?tryAcquire(long?timeout,?TimeUnit?unit)?throws?InterruptedException;

        //?支持超時(shí)機(jī)制的tryAcquire方法,?獲取全部所需數(shù)量的許可,?ture:?獲取成功;?false:?獲取失敗
        public?boolean?tryAcquire(int?permits,?long?timeout,?TimeUnit?unit)?throws?InterruptedException;

        //?一次性獲取所有剩余可用的許可,?返回成功獲取的許可數(shù)
        public?int?drainPermits();

        /******************************************************************/

        可以看到,對(duì)于信號(hào)量而言,其支持公平和非公平兩種類型。默認(rèn)為非公平的。值得一提的是,對(duì)于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的許可,會(huì)立即分配給它。不論是否存在其他正在等待許可的線程。即使當(dāng)前這個(gè)信號(hào)量實(shí)例是公平的,換言之tryAcquire()方法會(huì)破壞公平信號(hào)量實(shí)例的公平性。如果既期望使用非阻塞方式,又期望不破壞公平信號(hào)量的公平性,可以使用它的超時(shí)機(jī)制版本,同時(shí)將超時(shí)時(shí)間設(shè)為0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此處不再贅述

        基本實(shí)踐

        這里通過一個(gè)簡(jiǎn)單的實(shí)例,來進(jìn)行展示其基本的使用流程

        public?class?SemaphoreTest?{

        ????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss");

        ????//?系統(tǒng)最大的并發(fā)處理量
        ????private?static?Integer?maxLimit?=?5;

        ????@Test
        ????public?void?test1()?{
        ????????System.out.println("----------------------?系統(tǒng)上線?----------------------");
        ????????Semaphore?semaphore?=?new?Semaphore(maxLimit,?true);
        ????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);

        ????????IntStream.rangeClosed(1,8)
        ????????????.mapToObj(?num?->?new?UserReq("用戶#"+num,?semaphore)?)
        ????????????.forEach(?threadPool::execute?);

        ????????//?主線程等待所有任務(wù)執(zhí)行完畢
        ????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
        ????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
        ????}
        ????

        ????/**
        ?????*?打印信息
        ?????*?@param?msg
        ?????*/

        ????private?static?void?info(String?msg)?{
        ????????String?time?=?formatter.format(LocalTime.now());
        ????????String?log?=?"["+time+"]?"+?msg;
        ????????System.out.println(log);
        ????}

        ????@AllArgsConstructor
        ????private?static?class?UserReq?implements?Runnable{

        ????????private?String?name;

        ????????private?Semaphore?semaphore;

        ????????@Override
        ????????public?void?run()?{
        ????????????//?模擬用戶不定時(shí)發(fā)起請(qǐng)求
        ????????????try{?Thread.sleep(RandomUtils.nextLong(500,?2000));?}?catch?(Exception?e)?{}
        ????????????String?msg?=?name?+?":?發(fā)起請(qǐng)求,?系統(tǒng)可用資源數(shù):?"?+?semaphore.availablePermits();
        ????????????info(msg);

        ????????????//?阻塞等待,直到獲取許可
        ????????????try?{
        ????????????????semaphore.acquire();
        ????????????}catch?(InterruptedException?e)?{
        ????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
        ????????????}

        ????????????info(name?+?":?系統(tǒng)開始處理請(qǐng)求");
        ????????????//?模擬業(yè)務(wù)耗時(shí)
        ????????????try{?Thread.sleep(RandomUtils.nextInt(5,?20)*1000);?}?catch?(Exception?e)?{}

        ????????????//?用戶請(qǐng)求處理完畢,釋放許可
        ????????????semaphore.release();
        ????????????info(name?+?":?系統(tǒng)處理完畢");
        ????????}
        ????}
        }

        測(cè)試結(jié)果如下,符合預(yù)期

        figure 1.jpeg

        實(shí)現(xiàn)原理

        構(gòu)造器

        Semaphore信號(hào)量類的實(shí)現(xiàn)過程同樣依賴于AQS。具體地,其是對(duì)AQS中共享鎖的使用。在構(gòu)建Semaphore實(shí)例過程時(shí),一方面,通過sync變量持有AQS的實(shí)現(xiàn)類Sync,同時(shí)按公平性與否進(jìn)一步地可細(xì)分為NonfairSync、FairSync;另一方面,通過AQS的state字段來存儲(chǔ)許可的數(shù)量

        public?class?Semaphore?implements?java.io.Serializable?{

        ????private?final?Sync?sync;

        ????public?Semaphore(int?permits)?{
        ????????sync?=?new?NonfairSync(permits);
        ????}

        ????public?Semaphore(int?permits,?boolean?fair)?{
        ????????sync?=?fair???new?FairSync(permits)?:?new?NonfairSync(permits);
        ????}

        ????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
        ????????Sync(int?permits)?{
        ????????????setState(permits);
        ????????}
        ????}

        ????static?final?class?NonfairSync?extends?Sync?{
        ????????NonfairSync(int?permits)?{
        ????????????super(permits);
        ????????}
        ????}

        ????static?final?class?FairSync?extends?Sync?{
        ????????FairSync(int?permits)?{
        ????????????super(permits);
        ????????}
        ????}

        }

        acquire方法

        首先來看Semaphore的acquire()方法。其委托sync調(diào)用AQS的acquireSharedInterruptibly方法。而在AQS中通過調(diào)用tryAcquireShared方法判斷是否需要阻塞調(diào)用線程。具體地,在Semaphore的NonfairSync、FairSync內(nèi)部類分別實(shí)現(xiàn)了該tryAcquireShared方法的兩個(gè)版本:非公平、公平??梢钥吹絻煞N實(shí)現(xiàn)基本一致。tryAcquireShared如果返回負(fù)值,則說明當(dāng)前許可數(shù)不夠,當(dāng)前線程需要進(jìn)入AQS阻塞隊(duì)列;反之則獲取成功。只是在公平版本的實(shí)現(xiàn)中,會(huì)調(diào)用AQS的hasQueuedPredecessors方法來判斷是否有其他線程已經(jīng)在AQS隊(duì)列中進(jìn)行排隊(duì)。如果有,則tryAcquireShared直接返回-1,即當(dāng)前調(diào)用線程放棄獲取,轉(zhuǎn)而準(zhǔn)備進(jìn)入AQS隊(duì)列以保障公平性

        public?class?Semaphore?implements?java.io.Serializable?{

        ????public?void?acquire()?throws?InterruptedException?{
        ????????sync.acquireSharedInterruptibly(1);
        ????}

        ????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
        ????????//?非公平信號(hào)量獲取許可
        ????????final?int?nonfairTryAcquireShared(int?acquires)?{
        ????????????for?(;;)?{
        ????????????????int?available?=?getState();
        ????????????????int?remaining?=?available?-?acquires;
        ????????????????if?(remaining?0?||
        ????????????????????compareAndSetState(available,?remaining))
        ????????????????????return?remaining;
        ????????????}
        ????????}
        ????}????

        ????static?final?class?NonfairSync?extends?Sync?{
        ????????protected?int?tryAcquireShared(int?acquires)?{
        ????????????return?nonfairTryAcquireShared(acquires);
        ????????}
        ????}

        ????static?final?class?FairSync?extends?Sync?{
        ????????//?公平信號(hào)量獲取許可
        ????????protected?int?tryAcquireShared(int?acquires)?{
        ????????????for?(;;)?{
        ????????????????if?(hasQueuedPredecessors())
        ????????????????//?對(duì)于公平性實(shí)現(xiàn)而言,?如果AQS隊(duì)列存在排隊(duì)的節(jié)點(diǎn)
        ????????????????//?則直接返回-1,?即進(jìn)入AQS隊(duì)列進(jìn)行排隊(duì)以保證公平性
        ????????????????????return?-1;
        ????????????????//?通過訪問AQS的state字段,?獲取當(dāng)前可用的許可數(shù)量????
        ????????????????int?available?=?getState();
        ????????????????//?計(jì)算剩余可用的許可數(shù)量
        ????????????????int?remaining?=?available?-?acquires;
        ????????????????if?(remaining?0?||
        ????????????????????compareAndSetState(available,?remaining))
        ????????????????????return?remaining;
        ????????????}
        ????????}
        ????}
        }

        ...

        public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{

        ????public?final?void?acquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
        ????????//?線程被中斷則直接拋出異常
        ????????if?(Thread.interrupted())
        ????????????throw?new?InterruptedException();

        ????????if?(tryAcquireShared(arg)?0)
        ????????????doAcquireSharedInterruptibly(arg);
        ????}
        ????
        ????//?需要子類去實(shí)現(xiàn)
        ????protected?int?tryAcquireShared(int?arg)?{
        ????????throw?new?UnsupportedOperationException();
        ????}
        }

        release方法

        Semaphore的release()方法類似。其同樣是委托sync調(diào)用AQS的releaseShared方法。然后AQS執(zhí)行tryReleaseShared方法,如果該方法返回true,則會(huì)進(jìn)一步調(diào)用AQS的doReleaseShared方法來喚醒AQS隊(duì)列中其他線程??梢钥吹皆赟emaphore的Sync內(nèi)部類中,tryReleaseShared總是會(huì)返回true。其實(shí)現(xiàn)過程也很簡(jiǎn)單,如下所示

        public?class?Semaphore?implements?java.io.Serializable?{

        ????public?void?release()?{
        ????????sync.releaseShared(1);
        ????}

        ????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
        ????????protected?final?boolean?tryReleaseShared(int?releases)?{
        ????????????for?(;;)?{
        ????????????????//?通過訪問AQS的state字段,?獲取當(dāng)前可用的許可數(shù)量????
        ????????????????int?current?=?getState();
        ????????????????//?將釋放的許可數(shù)添加到當(dāng)前可用許可數(shù)量上
        ????????????????int?next?=?current?+?releases;
        ????????????????if?(next?//?overflow
        ????????????????????throw?new?Error("Maximum?permit?count?exceeded");
        ????????????????//?通過CAS的方式更新state字段
        ????????????????if?(compareAndSetState(current,?next))
        ????????????????????return?true;
        ????????????}
        ????????}
        ????}

        }

        ...

        public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
        ????public?final?boolean?releaseShared(int?arg)?{
        ????????if?(tryReleaseShared(arg))?{
        ????????????doReleaseShared();
        ????????????return?true;
        ????????}
        ????????return?false;
        ????}
        ????
        ????//?需要子類去實(shí)現(xiàn)
        ????protected?boolean?tryReleaseShared(int?arg)?{
        ????????throw?new?UnsupportedOperationException();
        ????}
        }

        參考文獻(xiàn)

        1. Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
        瀏覽 35
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            亚洲www | 男人女人真曰批40分钟视 | 日韩色狼 | 高清无码动漫 | 日本嫩逼 | 女被男c到爽哭视频免费网站 | 成人精品一区二区三区四区 | 广东少妇大战黑人34厘米视频 | 久久永久免费视频 | 欧美日韩精品欧美黑xB一区 |