1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        JUC 常用4大并發(fā)工具類,值得一看 !

        共 776字,需瀏覽 2分鐘

         ·

        2021-10-13 23:43

        點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

        優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

        什么是JUC?

          JUC就是java.util.concurrent包,這個(gè)包俗稱JUC,里面都是解決并發(fā)問題的一些東西

          該包的位置位于java下面的rt.jar包下面

        4大常用并發(fā)工具類:

          CountDownLatch

          CyclicBarrier

          Semaphore

          ExChanger?

        CountDownLatch:

          CountDownLatch,俗稱閉鎖,作用是類似加強(qiáng)版的Join,是讓一組線程等待其他的線程完成工作以后才執(zhí)行

          就比如在啟動(dòng)框架服務(wù)的時(shí)候,我們主線程需要在環(huán)境線程初始化完成之后才能啟動(dòng),這時(shí)候我們就可以實(shí)現(xiàn)使用CountDownLatch來完成

        /**
        ?????*?Constructs?a?{@code?CountDownLatch}?initialized?with?the?given?count.
        ?????*
        ?????*?@param?count?the?number?of?times?{@link?#countDown}?must?be?invoked
        ?????*????????before?threads?can?pass?through?{@link?#await}
        ?????*?@throws?IllegalArgumentException?if?{@code?count}?is?negative
        ?????*/
        ????public?CountDownLatch(int?count)?{
        ????????if?(count?"count?);
        ????????this.sync?=?new?Sync(count);
        ????}

          在源碼中可以看到,創(chuàng)建CountDownLatch時(shí),需要傳入一個(gè)int類型的參數(shù),將決定在執(zhí)行次扣減之后,等待的線程被喚醒

          ?通過這個(gè)類圖就可以知道其實(shí)CountDownLatch并沒有多少東西

          方法介紹:

            CountDownLatch:初始化方法

            await:等待方法,同時(shí)帶參數(shù)的是超時(shí)重載方法

            countDown:每執(zhí)行一次,計(jì)數(shù)器減一,就是初始化傳入的數(shù)字,也代表著一個(gè)線程完成了任務(wù)

            getCount:獲取當(dāng)前值

            toString:這個(gè)就不用說了

          里面的Sync是一個(gè)內(nèi)部類,外面的方法其實(shí)都是操作這個(gè)內(nèi)部類的,這個(gè)內(nèi)部類繼承了AQS,實(shí)現(xiàn)的標(biāo)準(zhǔn)方法,AQS將在后面的章節(jié)寫

        ?

        主線程中創(chuàng)建CountDownLatch(3),然后主線程await阻塞,然后線程A,B,C各自完成了任務(wù),調(diào)用了countDown,之后,每個(gè)線程調(diào)用一次計(jì)數(shù)器就會(huì)減一,初始是3,然后A線程調(diào)用后變成2,B線程調(diào)用后變成1,C線程調(diào)用后,變成0,這時(shí)就會(huì)喚醒正在await的主線程,然后主線程繼續(xù)執(zhí)行

        說一千道一萬,不如代碼寫幾行,上代碼:

        休眠工具類,之后的代碼都會(huì)用到

        package?org.dance.tools;

        import?java.util.concurrent.TimeUnit;

        /**
        ?*?類說明:線程休眠輔助工具類
        ?*/
        public?class?SleepTools?{

        ????/**
        ?????*?按秒休眠
        ?????*?@param?seconds?秒數(shù)
        ?????*/
        ????public?static?final?void?second(int?seconds)?{
        ????????try?{
        ????????????TimeUnit.SECONDS.sleep(seconds);
        ????????}?catch?(InterruptedException?e)?{
        ????????}
        ????}

        ????/**
        ?????*?按毫秒數(shù)休眠
        ?????*?@param?seconds?毫秒數(shù)
        ?????*/
        ????public?static?final?void?ms(int?seconds)?{
        ????????try?{
        ????????????TimeUnit.MILLISECONDS.sleep(seconds);
        ????????}?catch?(InterruptedException?e)?{
        ????????}
        ????}
        }
        package?org.dance.day2.util;

        import?org.dance.tools.SleepTools;

        import?java.util.concurrent.CountDownLatch;

        /**
        ?*?CountDownLatch的使用,有五個(gè)線程,6個(gè)扣除點(diǎn)
        ?*?扣除完成后主線程和業(yè)務(wù)線程,才能執(zhí)行工作
        ?*??扣除點(diǎn)一般都是大于等于需要初始化的線程的
        ?*?@author?ZYGisComputer
        ?*/
        public?class?UseCountDownLatch?{

        ????/**
        ?????*?設(shè)置為6個(gè)扣除點(diǎn)
        ?????*/
        ????static?CountDownLatch?countDownLatch?=?new?CountDownLatch(6);

        ????/**
        ?????*?初始化線程
        ?????*/
        ????private?static?class?InitThread?implements?Runnable?{

        ????????@Override
        ????????public?void?run()?{

        ????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?"?ready?init?work?.....");

        ????????????//?執(zhí)行扣減?扣減不代表結(jié)束
        ????????????countDownLatch.countDown();

        ????????????for?(int?i?=?0;?i?????????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?".....continue?do?its?work");
        ????????????}

        ????????}
        ????}

        ????/**
        ?????*?業(yè)務(wù)線程
        ?????*/
        ????private?static?class?BusiThread?implements?Runnable?{

        ????????@Override
        ????????public?void?run()?{

        ????????????//?業(yè)務(wù)線程需要在等初始化完畢后才能執(zhí)行
        ????????????try?{
        ????????????????countDownLatch.await();
        ????????????????for?(int?i?=?0;?i?????????????????????System.out.println("BusiThread?"?+?Thread.currentThread().getId()?+?"?do?business-----");
        ????????????????}
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}

        ????public?static?void?main(String[]?args)?{

        ????????//?創(chuàng)建單獨(dú)的初始化線程
        ????????new?Thread(){
        ????????????@Override
        ????????????public?void?run()?{
        ????????????????SleepTools.ms(1);
        ????????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?"?ready?init?work?step?1st.....");
        ????????????????//?扣減一次
        ????????????????countDownLatch.countDown();
        ????????????????System.out.println("begin?stop?2nd.....");
        ????????????????SleepTools.ms(1);
        ????????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?"?ready?init?work?step?2nd.....");
        ????????????????//?扣減一次
        ????????????????countDownLatch.countDown();

        ????????????}
        ????????}.start();
        ????????//?啟動(dòng)業(yè)務(wù)線程
        ????????new?Thread(new?BusiThread()).start();
        ????????//?啟動(dòng)初始化線程
        ????????for?(int?i?=?0;?i?<=?3;?i++)?{
        ????????????new?Thread(new?InitThread()).start();
        ????????}
        ????????//?主線程進(jìn)入等待
        ????????try?{
        ????????????countDownLatch.await();
        ????????????System.out.println("Main?do?ites?work.....");
        ????????}?catch?(InterruptedException?e)?{
        ????????????e.printStackTrace();
        ????????}

        ????}

        }

        返回結(jié)果:

        thread_13?ready?init?work?.....
        thread_13.....continue?do?its?work
        thread_13.....continue?do?its?work
        thread_14?ready?init?work?.....
        thread_14.....continue?do?its?work
        thread_14.....continue?do?its?work
        thread_15?ready?init?work?.....
        thread_15.....continue?do?its?work
        thread_11?ready?init?work?step?1st.....
        begin?stop?2nd.....
        thread_16?ready?init?work?.....
        thread_16.....continue?do?its?work
        thread_16.....continue?do?its?work
        thread_15.....continue?do?its?work
        thread_11?ready?init?work?step?2nd.....
        Main?do?ites?work.....
        BusiThread?12?do?business-----
        BusiThread?12?do?business-----
        BusiThread?12?do?business-----

        通過返回結(jié)果就可以很直接的看到業(yè)務(wù)線程是在初始化線程完全跑完之后,才開始執(zhí)行的

        ?

        CyclicBarrier:

          CyclicBarrier,俗稱柵欄鎖,作用是讓一組線程到達(dá)某個(gè)屏障,被阻塞,一直到組內(nèi)的最后一個(gè)線程到達(dá),然后屏障開放,接著,所有的線程繼續(xù)運(yùn)行

          這個(gè)感覺和CountDownLatch有點(diǎn)相似,但是其實(shí)是不一樣的,所謂的差別,將在下面詳解

          CyclicBarrier的構(gòu)造參數(shù)有兩個(gè)

        /**
        ?????*?Creates?a?new?{@code?CyclicBarrier}?that?will?trip?when?the
        ?????*?given?number?of?parties?(threads)?are?waiting?upon?it,?and
        ?????*?does?not?perform?a?predefined?action?when?the?barrier?is?tripped.
        ?????*
        ?????*?@param?parties?the?number?of?threads?that?must?invoke?{@link?#await}
        ?????*????????before?the?barrier?is?tripped
        ?????*?@throws?IllegalArgumentException?if?{@code?parties}?is?less?than?1
        ?????*/
        ????public?CyclicBarrier(int?parties)?{
        ????????this(parties,?null);
        ????}
        /**
        ?????*?Creates?a?new?{@code?CyclicBarrier}?that?will?trip?when?the
        ?????*?given?number?of?parties?(threads)?are?waiting?upon?it,?and?which
        ?????*?will?execute?the?given?barrier?action?when?the?barrier?is?tripped,
        ?????*?performed?by?the?last?thread?entering?the?barrier.
        ?????*
        ?????*?@param?parties?the?number?of?threads?that?must?invoke?{@link?#await}
        ?????*????????before?the?barrier?is?tripped
        ?????*?@param?barrierAction?the?command?to?execute?when?the?barrier?is
        ?????*????????tripped,?or?{@code?null}?if?there?is?no?action
        ?????*?@throws?IllegalArgumentException?if?{@code?parties}?is?less?than?1
        ?????*/
        ????public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
        ????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
        ????????this.parties?=?parties;
        ????????this.count?=?parties;
        ????????this.barrierCommand?=?barrierAction;
        ????}

        很明顯能感覺出來,上面的構(gòu)造參數(shù)調(diào)用了下面的構(gòu)造參數(shù),是一個(gè)構(gòu)造方法重載

        首先這個(gè)第一個(gè)參數(shù)也樹Int類型的,傳入的是執(zhí)行線程的個(gè)數(shù),這個(gè)數(shù)量和CountDownLatch不一樣,這個(gè)數(shù)量是需要和線程數(shù)量吻合的,CountDownLatch則不一樣,CountDownLatch可以大于等于,而CyclicBarrier只能等于,然后是第二個(gè)參數(shù),第二個(gè)參數(shù)是barrierAction,這個(gè)參數(shù)是當(dāng)屏障開放后,執(zhí)行的任務(wù)線程,如果當(dāng)屏障開放后需要執(zhí)行什么任務(wù),可以寫在這個(gè)線程中

        ?

        ?

        主線程創(chuàng)建CyclicBarrier(3,barrierAction),然后由線程開始執(zhí)行,線程A,B執(zhí)行完成后都調(diào)用了await,然后他們都在一個(gè)屏障前阻塞者,需要等待線程C也,執(zhí)行完成,調(diào)用await之后,然后三個(gè)線程都達(dá)到屏障后,屏障開放,然后線程繼續(xù)執(zhí)行,并且barrierAction在屏障開放的一瞬間也開始執(zhí)行

        上代碼:

        package?org.dance.day2.util;

        import?org.dance.tools.SleepTools;

        import?java.util.Map;
        import?java.util.Random;
        import?java.util.concurrent.BrokenBarrierException;
        import?java.util.concurrent.ConcurrentHashMap;
        import?java.util.concurrent.CyclicBarrier;

        /**
        ?*?CyclicBarrier的使用
        ?*
        ?*?@author?ZYGisComputer
        ?*/
        public?class?UseCyclicBarrier?{

        ????/**
        ?????*?存放子線程工作結(jié)果的安全容器
        ?????*/
        ????private?static?ConcurrentHashMap?resultMap?=?new?ConcurrentHashMap<>();

        ????private?static?CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,new?CollectThread());

        ????/**
        ?????*?結(jié)果打印線程
        ?????*?用來演示CyclicBarrier的第二個(gè)參數(shù),barrierAction
        ?????*/
        ????private?static?class?CollectThread?implements?Runnable?{

        ????????@Override
        ????????public?void?run()?{

        ????????????StringBuffer?result?=?new?StringBuffer();

        ????????????for?(Map.Entry?workResult?:?resultMap.entrySet())?{
        ????????????????result.append("["?+?workResult.getValue()?+?"]");
        ????????????}

        ????????????System.out.println("the?result?=?"?+?result);
        ????????????System.out.println("do?other?business.....");

        ????????}
        ????}

        ????/**
        ?????*?工作子線程
        ?????*?用于CyclicBarrier的一組線程
        ?????*/
        ????private?static?class?SubThread?implements?Runnable?{

        ????????@Override
        ????????public?void?run()?{

        ????????????//?獲取當(dāng)前線程的ID
        ????????????long?id?=?Thread.currentThread().getId();

        ????????????//?放入統(tǒng)計(jì)容器中
        ????????????resultMap.put(String.valueOf(id),?id);

        ????????????Random?random?=?new?Random();

        ????????????try?{
        ????????????????if?(random.nextBoolean())?{
        ????????????????????Thread.sleep(1000?+?id);
        ????????????????????System.out.println("Thread_"+id+".....?do?something");
        ????????????????}
        ????????????????System.out.println(id+"?is?await");
        ????????????????cyclicBarrier.await();
        ????????????????Thread.sleep(1000+id);
        ????????????????System.out.println("Thread_"+id+".....do?its?business");
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}?catch?(BrokenBarrierException?e)?{
        ????????????????e.printStackTrace();
        ????????????}

        ????????}
        ????}

        ????public?static?void?main(String[]?args)?{

        ????????for?(int?i?=?0;?i?<=?4;?i++)?{
        ????????????Thread?thread?=?new?Thread(new?SubThread());
        ????????????thread.start();
        ????????}

        ????}

        }

        返回結(jié)果:

        11?is?await
        14?is?await
        15?is?await
        Thread_12.....?do?something
        12?is?await
        Thread_13.....?do?something
        13?is?await
        the?result?=?[11][12][13][14][15]
        do?other?business.....
        Thread_11.....do?its?business
        Thread_12.....do?its?business
        Thread_13.....do?its?business
        Thread_14.....do?its?business
        Thread_15.....do?its?business

        通過返回結(jié)果可以看出前面的11 14 15三個(gè)線程沒有進(jìn)入if語句塊,在執(zhí)行到await的時(shí)候進(jìn)入了等待,而另外12 13兩個(gè)線程進(jìn)入到了if語句塊當(dāng)中,多休眠了1秒多,然后當(dāng)5個(gè)線程同時(shí)到達(dá)await的時(shí)候,屏障開放,執(zhí)行了barrierAction線程,然后線程組繼續(xù)執(zhí)行

        解釋一下CountDownLatch和CyclicBarrier的卻別吧!

        首先就是CountDownLatch的構(gòu)造參數(shù)傳入的數(shù)量一般都是大于等于線程,數(shù)量的,因?yàn)樗怯械谌娇刂频?可以扣減多次,然后就是CyclicBarrier的構(gòu)造參數(shù)第一個(gè)參數(shù)傳入的數(shù)量一定是等于線程的個(gè)數(shù)的,因?yàn)樗怯梢唤M線程自身控制的

        區(qū)別

              CountDownLatch  CyclicBarrier

        控制  ? ? ? ?第三方控制    ? ?自身控制

        傳入數(shù)量  大于等于線程數(shù)量? ? ? ?等于線程數(shù)量

        ?

        Semaphore:

          Semaphore,俗稱信號(hào)量,作用于控制同時(shí)訪問某個(gè)特定資源的線程數(shù)量,用在流量控制

          一說特定資源控制,那么第一時(shí)間就想到了數(shù)據(jù)庫連接..

          之前用等待超時(shí)模式寫了一個(gè)數(shù)據(jù)庫連接池,打算用這個(gè)Semaphone也寫一個(gè)

        /**
        ?????*?Creates?a?{@code?Semaphore}?with?the?given?number?of
        ?????*?permits?and?nonfair?fairness?setting.
        ?????*
        ?????*?@param?permits?the?initial?number?of?permits?available.
        ?????*????????This?value?may?be?negative,?in?which?case?releases
        ?????*????????must?occur?before?any?acquires?will?be?granted.
        ?????*/
        ????public?Semaphore(int?permits)?{
        ????????sync?=?new?NonfairSync(permits);
        ????}

        在源碼中可以看到在構(gòu)建Semaphore信號(hào)量的時(shí)候,需要傳入許可證的數(shù)量,這個(gè)數(shù)量就是資源的最大允許的訪問的線程數(shù)

        接下里用信號(hào)量實(shí)現(xiàn)一個(gè)數(shù)據(jù)庫連接池

        連接對(duì)象

        package?org.dance.day2.util.pool;

        import?org.dance.tools.SleepTools;

        import?java.sql.*;
        import?java.util.Map;
        import?java.util.Properties;
        import?java.util.concurrent.Executor;

        /**
        ?*?數(shù)據(jù)庫連接
        ?*?@author?ZYGisComputer
        ?*/
        public?class?SqlConnection?implements?Connection?{

        ????/**
        ?????*?獲取數(shù)據(jù)庫連接
        ?????*?@return
        ?????*/
        ????public?static?final?Connection?fetchConnection(){
        ????????return?new?SqlConnection();
        ????}

        ????@Override
        ????public?void?commit()?throws?SQLException?{
        ????????SleepTools.ms(70);
        ????}

        ????@Override
        ????public?Statement?createStatement()?throws?SQLException?{
        ????????SleepTools.ms(1);
        ????????return?null;
        ????}

        ????@Override
        ????public?PreparedStatement?prepareStatement(String?sql)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?CallableStatement?prepareCall(String?sql)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?String?nativeSQL(String?sql)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?setAutoCommit(boolean?autoCommit)?throws?SQLException?{

        ????}

        ????@Override
        ????public?boolean?getAutoCommit()?throws?SQLException?{
        ????????return?false;
        ????}

        ????@Override
        ????public?void?rollback()?throws?SQLException?{

        ????}

        ????@Override
        ????public?void?close()?throws?SQLException?{

        ????}

        ????@Override
        ????public?boolean?isClosed()?throws?SQLException?{
        ????????return?false;
        ????}

        ????@Override
        ????public?DatabaseMetaData?getMetaData()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?setReadOnly(boolean?readOnly)?throws?SQLException?{

        ????}

        ????@Override
        ????public?boolean?isReadOnly()?throws?SQLException?{
        ????????return?false;
        ????}

        ????@Override
        ????public?void?setCatalog(String?catalog)?throws?SQLException?{

        ????}

        ????@Override
        ????public?String?getCatalog()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?setTransactionIsolation(int?level)?throws?SQLException?{

        ????}

        ????@Override
        ????public?int?getTransactionIsolation()?throws?SQLException?{
        ????????return?0;
        ????}

        ????@Override
        ????public?SQLWarning?getWarnings()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?clearWarnings()?throws?SQLException?{

        ????}

        ????@Override
        ????public?Statement?createStatement(int?resultSetType,?int?resultSetConcurrency)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?PreparedStatement?prepareStatement(String?sql,?int?resultSetType,?int?resultSetConcurrency)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?CallableStatement?prepareCall(String?sql,?int?resultSetType,?int?resultSetConcurrency)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Map>?getTypeMap()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?setTypeMap(Map>?map)?throws?SQLException?{

        ????}

        ????@Override
        ????public?void?setHoldability(int?holdability)?throws?SQLException?{

        ????}

        ????@Override
        ????public?int?getHoldability()?throws?SQLException?{
        ????????return?0;
        ????}

        ????@Override
        ????public?Savepoint?setSavepoint()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Savepoint?setSavepoint(String?name)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?rollback(Savepoint?savepoint)?throws?SQLException?{

        ????}

        ????@Override
        ????public?void?releaseSavepoint(Savepoint?savepoint)?throws?SQLException?{

        ????}

        ????@Override
        ????public?Statement?createStatement(int?resultSetType,?int?resultSetConcurrency,?int?resultSetHoldability)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?PreparedStatement?prepareStatement(String?sql,?int?resultSetType,?int?resultSetConcurrency,?int?resultSetHoldability)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?CallableStatement?prepareCall(String?sql,?int?resultSetType,?int?resultSetConcurrency,?int?resultSetHoldability)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?PreparedStatement?prepareStatement(String?sql,?int?autoGeneratedKeys)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?PreparedStatement?prepareStatement(String?sql,?int[]?columnIndexes)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?PreparedStatement?prepareStatement(String?sql,?String[]?columnNames)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Clob?createClob()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Blob?createBlob()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?NClob?createNClob()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?SQLXML?createSQLXML()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?boolean?isValid(int?timeout)?throws?SQLException?{
        ????????return?false;
        ????}

        ????@Override
        ????public?void?setClientInfo(String?name,?String?value)?throws?SQLClientInfoException?{

        ????}

        ????@Override
        ????public?void?setClientInfo(Properties?properties)?throws?SQLClientInfoException?{

        ????}

        ????@Override
        ????public?String?getClientInfo(String?name)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Properties?getClientInfo()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Array?createArrayOf(String?typeName,?Object[]?elements)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?Struct?createStruct(String?typeName,?Object[]?attributes)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?setSchema(String?schema)?throws?SQLException?{

        ????}

        ????@Override
        ????public?String?getSchema()?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?void?abort(Executor?executor)?throws?SQLException?{

        ????}

        ????@Override
        ????public?void?setNetworkTimeout(Executor?executor,?int?milliseconds)?throws?SQLException?{

        ????}

        ????@Override
        ????public?int?getNetworkTimeout()?throws?SQLException?{
        ????????return?0;
        ????}

        ????@Override
        ????public??T?unwrap(Class?iface)?throws?SQLException?{
        ????????return?null;
        ????}

        ????@Override
        ????public?boolean?isWrapperFor(Class?iface)?throws?SQLException?{
        ????????return?false;
        ????}
        }

        連接池對(duì)象

        package?org.dance.day2.util.pool;

        import?java.sql.Connection;
        import?java.util.ArrayList;
        import?java.util.HashSet;
        import?java.util.Iterator;
        import?java.util.LinkedList;
        import?java.util.concurrent.Semaphore;

        /**
        ?*?使用信號(hào)量控制數(shù)據(jù)庫的鏈接和釋放
        ?*
        ?*?@author?ZYGisComputer
        ?*/
        public?class?DBPoolSemaphore?{

        ????/**
        ?????*?池容量
        ?????*/
        ????private?final?static?int?POOL_SIZE?=?10;

        ????/**
        ?????*?useful?代表可用連接
        ?????*?useless?代表已用連接
        ?????*??為什么要使用兩個(gè)Semaphore呢?是因?yàn)?在連接池中不只有連接本身是資源,空位也是資源,也需要記錄
        ?????*/
        ????private?final?Semaphore?useful,?useless;

        ????/**
        ?????*?連接池
        ?????*/
        ????private?final?static?LinkedList?POOL?=?new?LinkedList<>();

        ????/**
        ?????*?使用靜態(tài)塊初始化池
        ?????*/
        ????static?{
        ????????for?(int?i?=?0;?i?????????????POOL.addLast(SqlConnection.fetchConnection());
        ????????}
        ????}

        ????public?DBPoolSemaphore()?{
        ????????//?初始可用的許可證等于池容量
        ????????useful?=?new?Semaphore(POOL_SIZE);
        ????????//?初始不可用的許可證容量為0
        ????????useless?=?new?Semaphore(0);
        ????}

        ????/**
        ?????*?獲取數(shù)據(jù)庫連接
        ?????*
        ?????*?@return?連接對(duì)象
        ?????*/
        ????public?Connection?takeConnection()?throws?InterruptedException?{
        ????????//?可用許可證減一
        ????????useful.acquire();
        ????????Connection?connection;
        ????????synchronized?(POOL)?{
        ????????????connection?=?POOL.removeFirst();
        ????????}
        ????????//?不可用許可證數(shù)量加一
        ????????useless.release();
        ????????return?connection;
        ????}

        ????/**
        ?????*?釋放鏈接
        ?????*
        ?????*?@param?connection?連接對(duì)象
        ?????*/
        ????public?void?returnConnection(Connection?connection)?throws?InterruptedException?{
        ????????if(null!=connection){
        ????????????//?打印日志
        ????????????System.out.println("當(dāng)前有"+useful.getQueueLength()+"個(gè)線程等待獲取連接,,"
        ????????????????????+"可用連接有"+useful.availablePermits()+"個(gè)");
        ????????????//?不可用許可證減一
        ????????????useless.acquire();
        ????????????synchronized?(POOL){
        ????????????????POOL.addLast(connection);
        ????????????}
        ????????????//?可用許可證加一
        ????????????useful.release();
        ????????}
        ????}

        }

        測試類:

        package?org.dance.day2.util.pool;

        import?org.dance.tools.SleepTools;

        import?java.sql.Connection;
        import?java.util.Random;

        /**
        ?*?測試Semaphore
        ?*?@author?ZYGisComputer
        ?*/
        public?class?UseSemaphore?{

        ????/**
        ?????*?連接池
        ?????*/
        ????public?static?final?DBPoolSemaphore?pool?=?new?DBPoolSemaphore();

        ????private?static?class?BusiThread?extends?Thread{
        ????????@Override
        ????????public?void?run()?{
        ????????????//?隨機(jī)數(shù)工具類?為了讓每個(gè)線程持有連接的時(shí)間不一樣
        ????????????Random?random?=?new?Random();
        ????????????long?start?=?System.currentTimeMillis();
        ????????????try?{
        ????????????????Connection?connection?=?pool.takeConnection();
        ????????????????System.out.println("Thread_"+Thread.currentThread().getId()+
        ????????????????????????"_獲取數(shù)據(jù)庫連接耗時(shí)["+(System.currentTimeMillis()-start)+"]ms.");
        ????????????????//?模擬使用連接查詢數(shù)據(jù)
        ????????????????SleepTools.ms(100+random.nextInt(100));
        ????????????????System.out.println("查詢數(shù)據(jù)完成歸還連接");
        ????????????????pool.returnConnection(connection);
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}

        ????public?static?void?main(String[]?args)?{
        ????????for?(int?i?=?0;?i?????????????BusiThread?busiThread?=?new?BusiThread();
        ????????????busiThread.start();
        ????????}
        ????}

        }

        測試返回結(jié)果:

        Thread_11_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_12_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_13_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_14_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_15_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_16_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_17_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_18_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_19_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        Thread_20_獲取數(shù)據(jù)庫連接耗時(shí)[0]ms.
        查詢數(shù)據(jù)完成歸還連接
        當(dāng)前有40個(gè)線程等待獲取連接,,可用連接有0個(gè)
        Thread_21_獲取數(shù)據(jù)庫連接耗時(shí)[112]ms.
        查詢數(shù)據(jù)完成歸還連接
        ...................查詢數(shù)據(jù)完成歸還連接
        當(dāng)前有2個(gè)線程等待獲取連接,,可用連接有0個(gè)
        Thread_59_獲取數(shù)據(jù)庫連接耗時(shí)[637]ms.
        查詢數(shù)據(jù)完成歸還連接
        當(dāng)前有1個(gè)線程等待獲取連接,,可用連接有0個(gè)
        Thread_60_獲取數(shù)據(jù)庫連接耗時(shí)[660]ms.
        查詢數(shù)據(jù)完成歸還連接
        當(dāng)前有0個(gè)線程等待獲取連接,,可用連接有0個(gè)
        查詢數(shù)據(jù)完成歸還連接...................
        當(dāng)前有0個(gè)線程等待獲取連接,,可用連接有8個(gè)
        查詢數(shù)據(jù)完成歸還連接
        當(dāng)前有0個(gè)線程等待獲取連接,,可用連接有9個(gè)

        通過執(zhí)行結(jié)果可以很明確的看到,一上來就有10個(gè)線程獲取到了連接,,然后后面的40個(gè)線程進(jìn)入阻塞,然后只有釋放鏈接之后,等待的線程就會(huì)有一個(gè)拿到,然后越后面的線程等待的時(shí)間就越長,然后一直到所有的線程執(zhí)行完畢

        最后打印的可用連接有九個(gè)不是因?yàn)樯倭艘粋€(gè)是因?yàn)樵卺尫胖按蛴〉?不是錯(cuò)誤

        從結(jié)果中可以看到,我們對(duì)連接池中的資源的到了控制,這就是信號(hào)量的流量控制

        ?

        Exchanger:

          Exchanger,俗稱交換器,用于在線程之間交換數(shù)據(jù),但是比較受限,因?yàn)橹荒軆蓚€(gè)線程之間交換數(shù)據(jù)

        /**
        ?????*?Creates?a?new?Exchanger.
        ?????*/
        ????public?Exchanger()?{
        ????????participant?=?new?Participant();
        ????}

        這個(gè)構(gòu)造函數(shù)沒有什么好說的,也沒有入?yún)?只有在創(chuàng)建的時(shí)候指定一下需要交換的數(shù)據(jù)的泛型即可,下面看代碼

        package?org.dance.day2.util;

        import?java.util.HashSet;
        import?java.util.Set;
        import?java.util.concurrent.Exchanger;

        /**
        ?*?線程之間交換數(shù)據(jù)
        ?*?@author?ZYGisComputer
        ?*/
        public?class?UseExchange?{

        ????private?static?final?Exchanger>?exchanger?=?new?Exchanger<>();

        ????public?static?void?main(String[]?args)?{

        ????????new?Thread(){
        ????????????@Override
        ????????????public?void?run()?{
        ????????????????Set?aSet?=?new?HashSet<>();
        ????????????????aSet.add("A");
        ????????????????aSet.add("B");
        ????????????????aSet.add("C");
        ????????????????try?{
        ????????????????????Set?exchange?=?exchanger.exchange(aSet);
        ????????????????????for?(String?s?:?exchange)?{
        ????????????????????????System.out.println("aSet"+s);
        ????????????????????}
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????}.start();

        ????????new?Thread(){
        ????????????@Override
        ????????????public?void?run()?{
        ????????????????Set?bSet?=?new?HashSet<>();
        ????????????????bSet.add("1");
        ????????????????bSet.add("2");
        ????????????????bSet.add("3");
        ????????????????try?{
        ????????????????????Set?exchange?=?exchanger.exchange(bSet);
        ????????????????????for?(String?s?:?exchange)?{
        ????????????????????????System.out.println("bSet"+s);
        ????????????????????}
        ????????????????}?catch?(InterruptedException?e)?{
        ????????????????????e.printStackTrace();
        ????????????????}
        ????????????}
        ????????}.start();

        ????}

        }

        執(zhí)行結(jié)果:

        bSetA
        bSetB
        bSetC
        aSet1
        aSet2
        aSet3

        通過執(zhí)行結(jié)果可以清晰的看到,兩個(gè)線程中的數(shù)據(jù)發(fā)生了交換,這就是Exchanger的線程數(shù)據(jù)交換了

        以上就是JUC的4大常用并發(fā)工具類了


        ??作者?|??彼岸舞

        來源 |??cnblogs.com/flower-dance/p/13714006.html


        加鋒哥微信:?java1239??
        圍觀鋒哥朋友圈,每天推送Java干貨!

        瀏覽 46
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            一边舌吻一边揉我的胸 | 午夜色网 | 天堂网2014AV | 北条麻妃绝顶高潮90分钟 | 日韩中文无码电影 | 无码日本精品XXXXXXXXX | 操逼网站在线免费观看 | 91丨九色丨国产在线 | 一边亲一边摸免费观看30分钟 | 大鸡巴干逼视频 |