1. 【并發(fā)編程】聊聊阻塞隊(duì)列那些事(一)

        共 11419字,需瀏覽 23分鐘

         ·

        2022-07-13 00:27

        線程池的手動(dòng)創(chuàng)建一文中 ,我們介紹了通過 ThreadPoolExecutor創(chuàng)建線程池,其中有一個(gè)參數(shù)是BlockingQueue< Runnable> workQueue,這個(gè)參數(shù)的數(shù)據(jù)類型就是我們今天要介紹的阻塞隊(duì)列, 其作用是當(dāng)線程池的核心線程都在執(zhí)行任務(wù)時(shí),此時(shí)再有任務(wù)提交時(shí)用來存放任務(wù)。

        阻塞隊(duì)列簡(jiǎn)介

        隊(duì)列

        在介紹阻塞隊(duì)列之前,我們先來簡(jiǎn)單介紹下隊(duì)列這種數(shù)據(jù)結(jié)構(gòu)。

        隊(duì)列是一種先進(jìn)先出的線性表,它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作,進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭

        阻塞隊(duì)列

        阻塞隊(duì)列(BlockingQueue)是一種特殊的隊(duì)列,其特殊之處在于:在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?。?dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。Java中,阻塞隊(duì)列它是一個(gè)繼承了 Queue 接口的接口,也可以證明阻塞隊(duì)列就是一種特殊隊(duì)列。

        public interface BlockingQueue<E> extends Queue<E>{
        ...
        }

        阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,我們先簡(jiǎn)單了解下生產(chǎn)者-消費(fèi)者模型:兩個(gè)線程操作一個(gè)隊(duì)列,一個(gè)線程往隊(duì)列中插入數(shù)據(jù)(生產(chǎn)線程生產(chǎn)數(shù)據(jù));一個(gè)線程往隊(duì)列中取出數(shù)據(jù)(消費(fèi)線程消費(fèi)數(shù)據(jù))。

        生產(chǎn)者的生產(chǎn)速度和消費(fèi)者的消費(fèi)之間的速度可能不匹配,就可以通過阻塞隊(duì)列讓速度快的暫時(shí)阻塞, 如生產(chǎn)者每秒生產(chǎn)兩個(gè)數(shù)據(jù),而消費(fèi)者每秒消費(fèi)一個(gè)數(shù)據(jù),當(dāng)隊(duì)列已滿時(shí),生產(chǎn)者就會(huì)阻塞(掛起),等待消費(fèi)者消費(fèi)后,再進(jìn)行喚醒。

        阻塞隊(duì)列特點(diǎn)

        阻塞隊(duì)列線程安全

        阻塞隊(duì)列是線程安全的,我們?cè)诔绦蛑惺褂米枞?duì)列不需要自己去考慮更多的線程安全問題。降低了我們開發(fā)的難度和工作量。

        比如在生產(chǎn)者-消費(fèi)者模式使用阻塞隊(duì)列的時(shí)候,因?yàn)?span mp-original-font-size="14" mp-original-line-height="24.5" style="margin: 0px;padding: 0px;outline: 0px;max-width: 100%;box-sizing: border-box !important;word-wrap: break-word !important;color: rgb(255, 41, 65);font-size: 12.25px;line-height: 21.4375px;">阻塞隊(duì)列是線程安全的,所以生產(chǎn)者和消費(fèi)者即便 是多線程環(huán)境,自己不需要去考慮更多的線程安全問題,也不會(huì)發(fā)生線程安全問題。如下圖, 左側(cè)有兩個(gè)生產(chǎn)者線程,它們會(huì)將生產(chǎn)出來的結(jié)果放到中間的阻塞隊(duì)列中,而右側(cè)的兩個(gè)消費(fèi)者只需直接從阻塞隊(duì)列中取出結(jié)果進(jìn)行處理即可。

        阻塞隊(duì)列種類

        下圖展示了Queue 最主要的實(shí)現(xiàn)類,可以看出阻塞隊(duì)列主要 有6種實(shí)現(xiàn)類,分別是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue,將在下一篇文章再介紹它們各自的特點(diǎn) 。

        阻塞隊(duì)列的容量

        阻塞隊(duì)列分為有界無界兩種。LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一個(gè)數(shù),可以近似認(rèn)為是無限容量的 隊(duì)列,F(xiàn)ixedThreadPool類型線程池的實(shí)現(xiàn)就是用一個(gè)LinkedBlockingQueue來存放任務(wù)的。

        有界阻塞隊(duì)列典型的代表是ArrayBlockingQueue,使用這種阻塞隊(duì)列 ,一旦隊(duì)列容量滿了,就無法再往隊(duì)列里放數(shù)據(jù)了。

        阻塞隊(duì)列常用方法

        阻塞隊(duì)列中有三組和添加、刪除相關(guān)的方法,這三組方法比較相似,我們需要對(duì)這些方法進(jìn)行梳理。這三組方法是:

        • ? 1.拋出異常:add、remove、element

        • ? 2.返回結(jié)果但不拋出異常:offer、poll、peek

        • ? 3.阻塞:put、take

        add、remove、element

        • ? 1.add:往隊(duì)列里添加一個(gè)元素,如果隊(duì)列滿了,就會(huì)拋出異常來提示隊(duì)列已滿;

        • ? 2.remove:刪除元素,如果我們刪除的隊(duì)列是空的,remove 方法就會(huì)拋出異常;

        • ? 3.element:返回隊(duì)列的頭部節(jié)點(diǎn),但是并不刪除,如果隊(duì)列為空,element就會(huì)拋出異常。

        三個(gè)方法示例代碼如下:

        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        //add方法示例
        public class AddExample {
            public static void main(String[] args) {
                BlockingQueue<String> blockingQueue = 
                        new ArrayBlockingQueue<String>(2);
                
                blockingQueue.add("1");
                blockingQueue.add("2");
                System.out.println("blockingQueue  size:" + blockingQueue.size());

                blockingQueue.add("3");
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        blockingQueue  size:2
        Exception in thread "main" java.lang.
        IllegalStateException: Queue full
            at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
            at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:326)
            at thread.blockingqueue.AddExample.main(AddExample.java:13)

        在這段程序中,我們創(chuàng)建了一個(gè)容量為 2 的 BlockingQueue,通過add方法放入元素, 前2個(gè)能正常放入 ,但是在添加第3個(gè)元素的時(shí)候就拋出了IllegalStateException:Queue full異常。

        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        //remove方法示例
        public class RemoveExample {
            public static void main(String[] args) {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(1);
                blockingQueue.add("1");
                System.out.println("remove element:" + blockingQueue.remove());

                blockingQueue.remove();
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        remove element:1
        Exception in thread "main" java.util.
        NoSuchElementException
            at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
            at thread.blockingqueue.RemoveExample.main(RemoveExample.java:14)

        這段程序中,我們創(chuàng)建了一個(gè)容量為1的BlockingQueue并往中間放入一個(gè)元素,然后通過remove方法執(zhí)行刪除操作,刪除第一個(gè)元素正常,此時(shí)BlockingQueue為空,再執(zhí)行刪除操作,拋出NoSuchElementException異常。

        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        //element示例
        public class ElementExample {
            public static void main(String[] args) {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(1);
                blockingQueue.element();
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        Exception in thread "main" java.util.
        NoSuchElementException
            at java.base/java.util.AbstractQueue.element(AbstractQueue.java:136)
            at thread.blockingqueue.ElementExample.main(ElementExample.java:11)

        這段程序中,我們新建了一個(gè)容量為 1 的 ArrayBlockingQueue,但是并沒有往里面添加元素,也就是說ArrayBlockingQueue為空,我們調(diào)用 element 方法,也得到NoSuchElementException異常。

        offer、poll、peek

        這一組方法當(dāng)發(fā)現(xiàn)隊(duì)列滿了無法添加,或者隊(duì)列為空無法刪除的時(shí)候,是通過返回值來提示我們,而不是像第一組方法那樣拋出異常。

        • ? 1.offer:用來插入一個(gè)元素,用返回值來提示插入是否成功。添加成功返回 true,隊(duì)列已經(jīng)滿,調(diào)用 offer 方法返回false。

        • ? 2.poll:移除并返回隊(duì)列的頭節(jié)點(diǎn),如果當(dāng)隊(duì)列里面是空的返回 null 作為提示。

        • ? 3.peek:返回隊(duì)列的頭元素但并不刪除。如果隊(duì)列里面是空的,它便會(huì)返回 null 作為提示。

        三個(gè)方法示例代碼如下:

        //offer示例
        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        public class OfferExample {
            public static void main(String[] args) {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(2);

                boolean res1 = blockingQueue.offer("1");
                boolean res2 = blockingQueue.offer("2");
                boolean res3 = blockingQueue.offer("3");

                System.out.println("res1:" + res1);
                System.out.println("res2:" + res2);
                System.out.println("res3:" + res3);
                System.out.println("blockingQueue  size:" + blockingQueue.size());
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        res1:true
        res2:true
        res3:
        false
        blockingQueue  size:2

        在這段程序中,我們創(chuàng)建了一個(gè)容量為 2 的 BlockingQueue,通過offer方法放入元素,前面兩次添加成功了,返回true,但是第三次添加的時(shí)候,已經(jīng)超過了隊(duì)列的最大容量,所以會(huì)返回 false。

        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        //poll示例
        public class PollExample {
            public static void main(String[] args) {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(1);
                blockingQueue.offer("1");

                //不允許放入null,否則會(huì)拋出NullPointerException異常
                //blockingQueue.offer(null);
                System.out.println("poll element:" + blockingQueue.poll());
                System.out.println("poll element:" + blockingQueue.poll());
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        poll element:1
        poll element:
        null

        這段程序中,我們創(chuàng)建了一個(gè)容量為1的BlockingQueue并往中間放入一個(gè)元素,然后執(zhí)行poll方法,正常執(zhí)行將頭結(jié)點(diǎn)返回并刪除,此時(shí)BlockingQueue為空,再執(zhí)行poll操作,返回null注意,在使用這組方法時(shí),隊(duì)列中不允許插入null值,否則會(huì)拋出NullPointerException異常。

        //peek示例
        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        public class PeekExample {
            public static void main(String[] args) {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(1);

                System.out.println(blockingQueue.peek());
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        null

        我們新建了一個(gè)容量為 1 的 ArrayBlockingQueue,但是并沒有往里面添加元素,然后直接調(diào)用 peek,返回結(jié)果 null。

        put、take

        • ? 1.put:插入元素。隊(duì)列沒滿的時(shí)候是正常的插入,如果隊(duì)列已滿,插入的線程陷入阻塞狀態(tài),直到隊(duì)列里有了空閑空間,此時(shí)隊(duì)列就會(huì)讓之前的線程解除阻塞狀態(tài),并把剛才那個(gè)元素添加進(jìn)去。

        • ? 2.take:獲取并移除隊(duì)列的頭結(jié)點(diǎn)。隊(duì)列里有數(shù)據(jù)的時(shí)候會(huì)正常取出數(shù)據(jù)并刪除;隊(duì)列里無數(shù)據(jù),則阻塞,直到隊(duì)列里有數(shù)據(jù);一旦隊(duì)列里有數(shù)據(jù)了,就會(huì)立刻解除阻塞狀態(tài),并且取到數(shù)據(jù)。

        兩個(gè)個(gè)方法示例代碼如下:

        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        //put示例
        public class PutExample {
            public static void main(String[] args) throws InterruptedException {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(2);

                blockingQueue.put("1");
                blockingQueue.put("2");
                System.out.println("【0】blockingQueue  size:" + blockingQueue.size());
                blockingQueue.put("3");

                System.out.println("【1】blockingQueue  size:" + blockingQueue.size());
            }
        }

        ******************【運(yùn)行結(jié)果】******************

        0】blockingQueue  size:2

        //阻塞中 

        在這段程序中,我們創(chuàng)建了一個(gè)容量為 2 的 BlockingQueue,通過put方法添加完前2個(gè)元素,【0】處打印隊(duì)列的元素個(gè)數(shù)是2,說明前面兩個(gè)元素正常放入到隊(duì)列了。當(dāng)執(zhí)行第三次put操作,則由于隊(duì)列已滿使得線程阻塞,后面的【1】處打印得不到執(zhí)行 。

        import java.util.concurrent.ArrayBlockingQueue;
        import java.util.concurrent.BlockingQueue;

        //take示例
        public class TakeExample {
            public static void main(String[] args) throws InterruptedException {
                BlockingQueue<String> blockingQueue =
                        new ArrayBlockingQueue<String>(1);

                blockingQueue.take();
                System.out.println("測(cè)試能否執(zhí)行到這一句");
            }
        }

        ******************【運(yùn)行結(jié)果】******************
        //阻塞中 

        我們新建了一個(gè)容量為 1 的 ArrayBlockingQueue,但是并沒有往里面添加元素,然后直接調(diào)用take,線程阻塞,打印語句得不到執(zhí)行。

        本文源碼地址:
        https://github.com/qinlizhong1/javaStudy/tree/master/javaExample/src/main/java/thread/blockingqueue

        本文示例代碼環(huán)境:
        操作系統(tǒng):macOs 12.1
        JDK版本:12.0.1
        maven版本: 3.8.4

        —  —

        歡迎關(guān)注↓↓↓
        如有幫助,辛苦點(diǎn)贊和在看
        瀏覽 62
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 日韩无码黄色电影 | 亚洲第一黄色视频在线观看 | 女18一成人免费A级毛片 | 另类图片综合 | 91娇羞白丝网站 |