1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Java 線程池中的線程復(fù)用是如何實(shí)現(xiàn)的?

        共 3388字,需瀏覽 7分鐘

         ·

        2020-10-17 00:28

        前幾天,技術(shù)群里有個(gè)群友問了一個(gè)關(guān)于線程池的問題,內(nèi)容如圖所示:

        關(guān)于線程池相關(guān)知識可以先看下這篇:為什么阿里巴巴Java開發(fā)手冊中強(qiáng)制要求線程池不允許使用Executors創(chuàng)建?

        那么就來和大家探討下這個(gè)問題,在線程池中,線程會從 workQueue 中讀取任務(wù)來執(zhí)行,最小的執(zhí)行單位就是 Worker,Worker 實(shí)現(xiàn)了 Runnable 接口,重寫了 run 方法,這個(gè) run 方法是讓每個(gè)線程去執(zhí)行一個(gè)循環(huán),在這個(gè)循環(huán)代碼中,去判斷是否有任務(wù)待執(zhí)行,若有則直接去執(zhí)行這個(gè)任務(wù),因此線程數(shù)不會增加。

        如下是線程池創(chuàng)建線程的整體流程圖:

        首先會判斷線程池的狀態(tài),也就是是否在運(yùn)行,若線程為非運(yùn)行狀態(tài),則會拒絕。接下來會判斷線程數(shù)是否小于核心線程數(shù),若小于核心線程數(shù),會新建工作線程并執(zhí)行任務(wù),隨著任務(wù)的增多,線程數(shù)會慢慢增加至核心線程數(shù),如果此時(shí)還有任務(wù)提交,就會判斷阻塞隊(duì)列 workQueue 是否已滿,若沒滿,則會將任務(wù)放入到阻塞隊(duì)列中,等待工作線程獲得并執(zhí)行,如果任務(wù)提交非常多,使得阻塞隊(duì)達(dá)到上限,會去判斷線程數(shù)是否小于最大線程數(shù) maximumPoolSize,若小于最大線程數(shù),線程池會添加工作線程并執(zhí)行任務(wù),如果仍然有大量任務(wù)提交,使得線程數(shù)等于最大線程數(shù),如果此時(shí)還有任務(wù)提交,就會被拒絕。

        現(xiàn)在我們對這個(gè)流程大致有所了解,那么讓我們?nèi)タ纯丛创a是如何實(shí)現(xiàn)的吧!

        線程池的任務(wù)提交從 submit 方法來說,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:

        1. 把 Runnable 和 Callable 都轉(zhuǎn)化成 FutureTask
        2. 使用 execute 方法執(zhí)行 FutureTask

        execute 方法是 ThreadPoolExecutor 中的方法,源碼如下:

        public void execute(Runnable command) {
        // 若任務(wù)為空,則拋 NPE,不能執(zhí)行空任務(wù)
        if (command == null) {
        throw new NullPointerException();
        }
        int c = ctl.get();
        // 若工作線程數(shù)小于核心線程數(shù),則創(chuàng)建新的線程,并把當(dāng)前任務(wù) command 作為這個(gè)線程的第一個(gè)任務(wù)
        if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) {
        return;
        }
        c = ctl.get();
        }
        /**
        * 至此,有以下兩種情況:
        * 1.當(dāng)前工作線程數(shù)大于等于核心線程數(shù)
        * 2.新建線程失敗
        * 此時(shí)會嘗試將任務(wù)添加到阻塞隊(duì)列 workQueue
        */
        // 若線程池處于 RUNNING 狀態(tài),將任務(wù)添加到阻塞隊(duì)列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
        // 再次檢查線程池標(biāo)記
        int recheck = ctl.get();
        // 如果線程池已不處于 RUNNING 狀態(tài),那么移除已入隊(duì)的任務(wù),并且執(zhí)行拒絕策略
        if (!isRunning(recheck) && remove(command)) {
        // 任務(wù)添加到阻塞隊(duì)列失敗,執(zhí)行拒絕策略
        reject(command);
        }
        // 如果線程池還是 RUNNING 的,并且線程數(shù)為 0,那么開啟新的線程
        else if (workerCountOf(recheck) == 0) {
        addWorker(null, false);
        }
        }
        /**
        * 至此,有以下兩種情況:
        * 1.線程池處于非運(yùn)行狀態(tài),線程池不再接受新的線程
        * 2.線程處于運(yùn)行狀態(tài),但是阻塞隊(duì)列已滿,無法加入到阻塞隊(duì)列
        * 此時(shí)會嘗試以最大線程數(shù)為界創(chuàng)建新的工作線程
        */
        else if (!addWorker(command, false)) {
        // 任務(wù)進(jìn)入線程池失敗,執(zhí)行拒絕策略
        reject(command);
        }
        }

        可以看到 execute 方法中的的核心方法為 addWorker,再去看 addWorker 方法之前,先看下 Worker 的初始化方法:

        Worker(Runnable firstTask) {
        // 每個(gè)任務(wù)的鎖狀態(tài)初始化為-1,這樣工作線程在運(yùn)行之前禁止中斷
        setState(-1);
        this.firstTask = firstTask;
        // 把 Worker 作為 thread 運(yùn)行的任務(wù)
        this.thread = getThreadFactory().newThread(this);
        }

        在 Worker 初始化時(shí)把當(dāng)前 Worker 作為線程的構(gòu)造器入?yún)?,接下來?addWorker 方法中可以找到如下代碼:

        final Thread t = w.thread;
        // 如果成功添加了 Worker,就可以啟動(dòng) Worker 了
        if (workerAdded) {
        t.start();
        workerStarted = true;
        }

        這塊代碼是添加 worker 成功,調(diào)用 start 方法啟動(dòng)線程,Thread t = w.thread; 此時(shí)的 w 是 Worker 的引用,那么t.start();實(shí)際上執(zhí)行的就是 Worker 的 run 方法。

        Worker 的 run 方法中調(diào)用了 runWorker 方法,簡化后的 runWorker 源碼如下:

        final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
        try {
        task.run();
        } finally {
        task = null;
        }
        }
        }

        這個(gè) while 循環(huán)有個(gè) getTask 方法,getTask 的主要作用是阻塞從隊(duì)列中拿任務(wù)出來,如果隊(duì)列中有任務(wù),那么就可以拿出來執(zhí)行,如果隊(duì)列中沒有任務(wù),這個(gè)線程會一直阻塞到有任務(wù)為止(或者超時(shí)阻塞),其中 getTask 方法的時(shí)序圖如下:

        其中線程復(fù)用的關(guān)鍵是 1.6 和 1.7 部分,這部分源碼如下:

        Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

        使用隊(duì)列的 poll 或 take 方法從隊(duì)列中拿數(shù)據(jù),根據(jù)隊(duì)列的特性,隊(duì)列中有任務(wù)可以返回,隊(duì)列中無任務(wù)會阻塞。

        線程池的線程復(fù)用就是通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中不停地取任務(wù),并直接調(diào)用 Runnable 的 run 方法來執(zhí)行任務(wù),這樣就保證了每個(gè)線程都始終在一個(gè)循環(huán)中,反復(fù)獲取任務(wù),然后執(zhí)行任務(wù),從而實(shí)現(xiàn)了線程的復(fù)用。

        總結(jié)

        本文主要從源碼的角度解析了 Java 線程池中的線程復(fù)用是如何實(shí)現(xiàn)的。歡迎大家留言交流討論。

        瀏覽 45
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            av射射| 超碰97色色 | 欧美日韩亚洲色图 | 自拍偷拍在线视频 | 做受高潮视频 | 欧美成人在线免费 | 91成人在线视频 | 啊~嗯去浴室做h老师双男主 | 亚洲精品一区中文字幕 | 久久婷婷婷精品秘国产生583 |