1. JUC 中 4 個(gè)常用的并發(fā)工具類

        共 467字,需瀏覽 1分鐘

         ·

        2021-10-29 14:15

        CountDownLatch

        CountDownLatch是我目前使用比較多的類,CountDownLatch初始化時(shí)會(huì)給定一個(gè)計(jì)數(shù),然后每次調(diào)用countDown() 計(jì)數(shù)減1,

        當(dāng)計(jì)數(shù)未到達(dá)0之前調(diào)用await() 方法會(huì)阻塞直到計(jì)數(shù)減到0;

        使用場(chǎng)景:多用于劃分任務(wù)由多個(gè)線程執(zhí)行,例如:最近寫個(gè)豆瓣爬蟲,需要爬取每個(gè)電影的前五頁(yè)短評(píng),可以劃分成五個(gè)線程來(lái)處理數(shù)據(jù)。通過(guò)latch.await()保證全部完成再返回。

        ????public?void?latch()?throws?InterruptedException?{
        ????????int?count=?5;
        ????????CountDownLatch?latch?=?new?CountDownLatch(count);
        ????????for?(int?x=0;x????????????new?Worker(x*20,latch).start();
        ????????}
        ????????latch.await();
        ????????System.out.println("全部執(zhí)行完畢");
        ????}
        ????
        ????class?Worker?extends?Thread{
        ????????Integer?start;
        ????????CountDownLatch?latch;
        ????????public?Worker(Integer?start,CountDownLatch?latch){
        ????????????this.start=start;
        ????????????this.latch=latch;
        ????????}????????@Override
        ????????public?void?run()?{
        ????????????System.out.println(start+"?已執(zhí)行");
        ????????????latch.countDown();
        ????????}
        ????}

        輸出如下:

        20?已執(zhí)行
        0?已執(zhí)行
        40?已執(zhí)行
        60?已執(zhí)行
        80?已執(zhí)行
        全部執(zhí)行完畢

        CyclicBarrier

        它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)也就是阻塞在調(diào)用cyclicBarrier.await()的地方。

        看上去CyclicBarrier 跟CountDownLatch 功能上類似,在官方doc上CountDownLatch的描述上就說(shuō)明了,CountDownLatch 的計(jì)數(shù)無(wú)法被重置,

        如果需要重置計(jì)數(shù),請(qǐng)考慮使用CyclicBarrier。

        CyclicBarrier初始時(shí)還可添加一個(gè)Runnable的參數(shù), 此Runnable在CyclicBarrier的數(shù)目達(dá)到后,所有其它線程被喚醒前被最后一個(gè)進(jìn)入 CyclicBarrier 的線程執(zhí)行

        使用場(chǎng)景:類似CyclicBarrier,但是 CyclicBarrier提供了幾個(gè)countdownlatch 沒有的方法以應(yīng)付更復(fù)雜的場(chǎng)景,例如:

        getNumberWaiting() 獲取阻塞線程數(shù)量,

        isBroken() 用來(lái)知道阻塞的線程是否被中斷等方法。

        reset() 將屏障重置為其初始狀態(tài)。如果所有參與者目前都在屏障處等待,則它們將返回,同時(shí)拋出一個(gè) BrokenBarrierException。

        ????public?void?latch()?throws?InterruptedException?{
        ????????int?count?=?5;
        ????????CyclicBarrier?cb?=?new?CyclicBarrier(count,?new?Runnable()?{
        ????????????@Override
        ????????????public?void?run()?{
        ????????????????System.out.println("全部執(zhí)行完畢");
        ????????????}
        ????????});
        ????????ExecutorService?executorService?=?Executors.newFixedThreadPool(count);
        ????????while?(true){
        ????????????for?(int?x=0;x????????????????executorService.execute(new?Worker(x,cb));
        ????????????}
        ????????}
        ????}????
        ????
        ????class?Worker?extends?Thread?{
        ????????Integer?start;
        ????????CyclicBarrier?cyclicBarrier;????????public?Worker(Integer?start,?CyclicBarrier?cyclicBarrier)?{
        ????????????this.start?=?start;
        ????????????this.cyclicBarrier?=?cyclicBarrier;
        ????????}????????@Override
        ????????public?void?run()?{
        ????????????System.out.println(start?+?"?已執(zhí)行");
        ????????????try?{
        ????????????????cyclicBarrier.await();
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}?catch?(BrokenBarrierException?e)?{
        ????????????????e.printStackTrace();
        ????????????}
        ????????}
        ????}

        輸出如下:

        0?已執(zhí)行
        3?已執(zhí)行
        4?已執(zhí)行
        2?已執(zhí)行
        1?已執(zhí)行
        全部執(zhí)行完畢
        0?已執(zhí)行
        1?已執(zhí)行
        2?已執(zhí)行
        3?已執(zhí)行
        4?已執(zhí)行
        全部執(zhí)行完畢

        Semaphore

        Semaphore 信號(hào)量維護(hù)了一個(gè)許可集,每次使用時(shí)執(zhí)行acquire()從Semaphore獲取許可,如果沒有則會(huì)阻塞,每次使用完執(zhí)行release()釋放許可。

        使用場(chǎng)景:Semaphore對(duì)用于對(duì)資源的控制,比如數(shù)據(jù)連接有限,使用Semaphore限制訪問(wèn)數(shù)據(jù)庫(kù)的線程數(shù)。

        ????public?void?latch()?throws?InterruptedException,?IOException?{
        ????????int?count?=?5;
        ????????Semaphore?semaphore?=?new?Semaphore(1);
        ????????ExecutorService?executorService?=?Executors.newFixedThreadPool(count);
        ????????????for?(int?x=0;x????????????????executorService.execute(new?Worker(x,semaphore));
        ????????????}
        ????????System.in.read();
        ????}????
        ????
        ????class?Worker?extends?Thread?{
        ????????Integer?start;
        ????????Semaphore?semaphore;????????public?Worker(Integer?start,?Semaphore?semaphore)?{
        ????????????this.start?=?start;
        ????????????this.semaphore?=?semaphore;
        ????????}????????@Override
        ????????public?void?run()?throws?IllegalArgumentException?{
        ????????????try?{
        ????????????????System.out.println(start?+?"?準(zhǔn)備執(zhí)行");
        ????????????????TimeUnit.SECONDS.sleep(1);
        ????????????????semaphore.acquire();
        ????????????????System.out.println(start?+?"?已經(jīng)執(zhí)行");
        ????????????????semaphore.release();
        ????????????????System.out.println(start?+?"?已經(jīng)釋放");
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}????????}
        ????}

        輸出如下:

        0?準(zhǔn)備執(zhí)行
        2?準(zhǔn)備執(zhí)行
        1?準(zhǔn)備執(zhí)行
        3?準(zhǔn)備執(zhí)行
        4?準(zhǔn)備執(zhí)行
        2?已經(jīng)執(zhí)行
        2?已經(jīng)釋放
        4?已經(jīng)執(zhí)行
        4?已經(jīng)釋放
        1?已經(jīng)執(zhí)行
        1?已經(jīng)釋放
        0?已經(jīng)執(zhí)行
        0?已經(jīng)釋放
        3?已經(jīng)執(zhí)行
        3?已經(jīng)釋放

        Exchanger

        Exchanger 用于兩個(gè)線程間的數(shù)據(jù)交換,它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。

        使用場(chǎng)景:兩個(gè)線程相互等待處理結(jié)果并進(jìn)行數(shù)據(jù)傳遞。

        ????public?void?latch()?throws?InterruptedException,?IOException?{
        ????????int?count?=?5;
        ????????Exchanger?exchanger?=?new?Exchanger<>();
        ????????ExecutorService?executorService?=?Executors.newFixedThreadPool(count);
        ????????????for?(int?x=0;x????????????????executorService.execute(new?Worker(x,exchanger));
        ????????????}
        ????????System.in.read();
        ????}????
        ????
        ????class?Worker?extends?Thread?{
        ????????Integer?start;
        ????????Exchanger??exchanger;????????public?Worker(Integer?start,?Exchanger?exchanger)?{
        ????????????this.start?=?start;
        ????????????this.exchanger?=?exchanger;
        ????????}????????@Override
        ????????public?void?run()?throws?IllegalArgumentException?{
        ????????????try?{
        ????????????????System.out.println(Thread.currentThread().getName()?+?"?準(zhǔn)備執(zhí)行");
        ????????????????TimeUnit.SECONDS.sleep(start);
        ????????????????System.out.println(Thread.currentThread().getName()?+?"?等待交換");
        ????????????????String?value?=?exchanger.exchange(Thread.currentThread().getName());
        ????????????????System.out.println(Thread.currentThread().getName()?+?"?交換得到數(shù)據(jù)為:"+value);
        ????????????}?catch?(InterruptedException?e)?{
        ????????????????e.printStackTrace();
        ????????????}????????}
        ????}

        輸出如下:

        pool-1-thread-1?準(zhǔn)備執(zhí)行
        pool-1-thread-1?等待交換
        pool-1-thread-3?準(zhǔn)備執(zhí)行
        pool-1-thread-2?準(zhǔn)備執(zhí)行
        pool-1-thread-5?準(zhǔn)備執(zhí)行
        pool-1-thread-4?準(zhǔn)備執(zhí)行
        pool-1-thread-2?等待交換
        pool-1-thread-1 交換得到數(shù)據(jù)為:pool-1-thread-2
        pool-1-thread-2 交換得到數(shù)據(jù)為:pool-1-thread-1
        pool-1-thread-3?等待交換
        pool-1-thread-4?等待交換
        pool-1-thread-4 交換得到數(shù)據(jù)為:pool-1-thread-3
        pool-1-thread-3 交換得到數(shù)據(jù)為:pool-1-thread-4
        pool-1-thread-5?等待交換

        Exchanger必須成對(duì)出現(xiàn),否則會(huì)像上面代碼執(zhí)行結(jié)果那樣,pool-1-thread-5一直阻塞等待與其交換數(shù)據(jù)的線程,為了避免這一現(xiàn)象,可以使用exchange(V x, long timeout, TimeUnit unit)設(shè)置最大等待時(shí)長(zhǎng)

        原文出處:https://www.shuzhiduo.com/A/kPzOYlXa5x/


        -?END?-

        「技術(shù)分享」某種程度上,是讓作者和讀者,不那么孤獨(dú)的東西。歡迎關(guān)注我的微信公眾號(hào):「Kirito的技術(shù)分享」



        瀏覽 57
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 五月丁香综合网 | 少妇高潮zzzzzzzyⅹ | 欧美天堂一区二区三区 | 午夜激情久久 | 骚逼娇妻 |