1. JAVA線程池源碼全解析

        共 35282字,需瀏覽 71分鐘

         ·

        2021-09-22 20:03

        線程池的具體使用方法和參數解析等我在之前已經講解過,如果對線程池基本用法和概念不清晰的可以先看下我之前的線程池的文章,這里就通過一張線程池運行流程圖來幫助大家去簡單了解下線程池的工作原理。

        線程池源碼我們主要通過ThreadPoolExecutor進行分析,一步一步剖析線程池源碼的核心內容。

        01

        屬性解析

        //高3位:表示當前線程池運行狀態(tài) 除去高3位之后的低位:
            // 表示當前線程池所擁有的線程數量
            private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
            // 表示在ctl中,低COUNT_BITS位 用于存放當前線程數量的位
            private static final int COUNT_BITS = Integer.SIZE - 3;
            //低COUNT_BITS位 所能表達的最大數值
            private static final int CAPACITY = (1 << COUNT_BITS) - 1;

            //表示可接受新任務,且可執(zhí)行隊列中的任務;
            private static final int RUNNING = -1 << COUNT_BITS;
            //表示不接受新任務,但可執(zhí)行隊列中的任務;
            private static final int SHUTDOWN = 0 << COUNT_BITS;
            //表示不接受新任務,且不再執(zhí)行隊列中的任務,且中斷正在執(zhí)行的任務;
            private static final int STOP = 1 << COUNT_BITS;
           // 所有任務已經中止,且工作線程數量為0,最后變遷到這個狀態(tài)的線程將要執(zhí)行terminated()鉤子方法,只會有一個線程執(zhí)行這個方法;
            private static final int TIDYING = 2 << COUNT_BITS;
            //中止狀態(tài),已經執(zhí)行完terminated()鉤子方法;
            private static final int TERMINATED = 3 << COUNT_BITS;

            //任務隊列,當線程池中的線程達到核心線程數量時,再提交任務 就會直接提交到 workQueue
            private final BlockingQueue<Runnable> workQueue;

            //線程池全局鎖,增加worker 減少 worker 時需要持有mainLock , 修改線程池運行狀態(tài)時,也需要。
            private final ReentrantLock mainLock = new ReentrantLock();

            //線程池中真正存放 worker->thread 的地方。
            private final HashSet<Worker> workers = new HashSet<Worker>();

            private final Condition termination = mainLock.newCondition();

            // 記錄線程池生命周期內 線程數最大值
            private int largestPoolSize;

            // 記錄線程池所完成任務總數
            private long completedTaskCount;

            // 創(chuàng)建線程會使用線程工廠
            private volatile ThreadFactory threadFactory;
            /**
             * 拒絕策略
             */

            private volatile RejectedExecutionHandler handler;
            //空閑線程存活時間,當allowCoreThreadTimeOut == false 時,會維護核心線程數量內的線程存活,超出部分會被超時。
            //allowCoreThreadTimeOut == true 核心數量內的線程 空閑時 也會被回收。
            private volatile long keepAliveTime;
            //控制核心線程數量內的線程 是否可以被回收。true 可以,false不可以。
            private volatile boolean allowCoreThreadTimeOut;
            // 核心線程池數量
            private volatile int corePoolSize;

            // 線程池最大數量
            private volatile int maximumPoolSize;

        描述線程池狀態(tài)的屬性是貫穿整個線程池源碼的核心,這里用一張圖來描述一下。

        1. running狀態(tài):當線程池是運行狀態(tài)時,可以接收任務,也可以運行任務。


        2. shutdown狀態(tài):此狀態(tài)下,線程池不會再接收新的任務,當前的任務會繼續(xù)執(zhí)行完成

        3. stop狀態(tài):當調用shutdownNow方法時,線程池會進入stop狀態(tài),不會接受新的任務,正在運行的任務也會被立即終止

        4. tidying狀態(tài):進入該狀態(tài)下此時線程池中任務和線程數量都為空



        線程池運行任務可以通過submit方法和execute方法來完成

        它量的區(qū)別在于submit會有返回值,返回Future對象,通過這個對象可以獲取線程執(zhí)行結果,execute沒有返回值,下面來分別進行進行分析


        02

        execute方法

        首先來分析execute方法,這也是線程池最核心的方法,因為submit方法其底層也是調用execute方法進行執(zhí)行。

        說execute方法之前,先來看下ThreadPoolExecutor的靜態(tài)內部類Worker類。

         //Worker采用了AQS的獨占模式
            //獨占模式:兩個重要屬性  state  和  ExclusiveOwnerThread
            //state:0時表示未被占用 > 0時表示被占用   < 0 時 表示初始狀態(tài),這種情況下不能被搶鎖。
            //ExclusiveOwnerThread:表示獨占鎖的線程。
            private final class Worker
                    extends AbstractQueuedSynchronizer
                    implements Runnable 
        {
                /**
                 * This class will never be serialized, but we provide a
                 * serialVersionUID to suppress a javac warning.
                 */

                private static final long serialVersionUID = 6138294804551838833L;

                // worker內部封裝的工作線程
                final Thread thread;
                //假設firstTask不為空,那么當worker啟動后(內部的線程啟動)會優(yōu)先執(zhí)行firstTask,當執(zhí)行完firstTask后,會到queue中去獲取下一個任務。
                Runnable firstTask;
                // 記錄當前worker所完成的任務數量
                volatile long completedTasks;

                /**
                 * Creates with given first task and thread from ThreadFactory.
                 *
                 * @param firstTask the first task (null if none)
                 */

                Worker(Runnable firstTask) {
                    // 設置AQS獨占模式為初始化中的狀態(tài),這時候不能被搶占
                    setState(-1); // inhibit interrupts until runWorker
                    this.firstTask = firstTask;
                    // 使用線程工廠創(chuàng)建一個線程
                    this.thread = getThreadFactory().newThread(this);
                }


        線程池中的工作線程以Worker作為體現,真正工作的線程為Worker的成員變量,Worker即是Runnable,又是同步器。Worker從工作隊列中取出任務來執(zhí)行,并能通過Worker控制任務狀態(tài)。

        接下來通過execute方法源碼來看下如何通過Worker完成任務的創(chuàng)建及運行。

            public void execute(Runnable command{
                if (command == null)
                    throw new NullPointerException();
                // 獲取ctl的值
                int c = ctl.get();
                // 當前線程數小于核心線程池數量,此次提交任務,直接創(chuàng)建一個新的worker
                // 相對應線程池多了一個新的線程
                if (workerCountOf(c) < corePoolSize) {
                    // addWorker 即為創(chuàng)建線程的過程,會創(chuàng)建worker對象,并且將command作為firstTask
                    // core==true 表示采用核心線程數量限制,false采用maxinumPoolSize
                    if (addWorker(command, true))
                        return;
                    c = ctl.get();
                }
                // 執(zhí)行到這里有幾種情況?
                // 1.當前線程池數量已經達到corePoolSize
                // 2. addWorker失敗


                // 當前線程池處于running狀態(tài),嘗試將task放入到workQueue中
                if (isRunning(c) && workQueue.offer(command)) {
                    // 獲取dangqctl
                    int recheck = ctl.get();
                    // !isRunning()成功,代表當你提交到任務隊列后,線程池狀態(tài)被外部線程給修改,例如調用了shutDown(),shutDownNow()
                    // remove成功,提交之后,線程池中的線程還沒消費
                    // remove 失敗,說明在shutDown或者shutDown之前,就被線程池的線程給處理了
                    if (!isRunning(recheck) && remove(command))
                        reject(command);
                        // 當前線程池是running狀態(tài),
                    else if (workerCountOf(recheck) == 0)
                        // 如果當前沒有線程,就添加一個線程保證當前至少有一個線程存在
                        addWorker(nullfalse);
                }
                //執(zhí)行到這里,有幾種情況?
                //1.offer失敗
                //2.當前線程池是非running狀態(tài)

                //1.offer失敗,需要做什么? 說明當前queue 滿了!這個時候 如果當前線程數量尚未達到maximumPoolSize的話,會創(chuàng)建新的worker直接執(zhí)行command
                //假設當前線程數量達到maximumPoolSize的話,這里也會失敗,也走拒絕策略。

                //2.線程池狀態(tài)為非running狀態(tài),這個時候因為 command != null addWorker 一定是返回false。
                else if (!addWorker(command, false))
                    reject(command);
            }

        execute方法的執(zhí)行流程大致可以分為以下幾步:

        1. 工作線程數量小于核心數量,創(chuàng)建核心線程;

        2. 達到核心數量,進入任務隊列;

        3. 任務隊列滿了,創(chuàng)建非核心線程;

        4. 達到最大數量,執(zhí)行拒絕策略;


        通過這個運行圖再結合上面的源碼可能對這個execute方法的具體執(zhí)行流程就更加清楚了,下面就深入到每一個流程的細節(jié)去分析。

        如果工作線程小于核心線程就會通過addWorker方法創(chuàng)建新的核心任務線程。


        03

        addWorker方法


          //firstTask 可以為null,表示啟動worker之后,worker自動到queue中獲取任務.. 如果不是null,則worker優(yōu)先執(zhí)行firstTask
            //core 采用的線程數限制 如果為true 采用 核心線程數限制  false采用 maximumPoolSize線程數限制.
            private boolean addWorker(Runnable firstTask, boolean core) {
                // 自旋:判斷當前線程池狀態(tài)是否允許創(chuàng)建線程的事情
                retry:
                for (; ; ) {
                    // 獲取當前ctl值
                    int c = ctl.get();
                    // 獲取當前線程池運行狀態(tài)
                    int rs = runStateOf(c);

                    // Check if queue empty only if necessary.
                    // 判斷當前線程池是否允許添加線程
                    if (rs >= SHUTDOWN &&
                            !(rs == SHUTDOWN &&
                                    firstTask == null &&
                                    !workQueue.isEmpty()))
                        return false;
                    // 內部自旋:獲取創(chuàng)建線程令牌的過程
                    for (; ; ) {
                        int wc = workerCountOf(c);
                        //判斷當前線程是否超過限制,超過限制就無法創(chuàng)建線程
                        if (wc >= CAPACITY ||
                                wc >= (core ? corePoolSize : maximumPoolSize))
                            return false;
                        // 通過cas將線程數量加1,能夠成功加1相當于申請到創(chuàng)建線程的令牌
                        if (compareAndIncrementWorkerCount(c))
                            break retry;
                        c = ctl.get();  // Re-read ctl
                        // 判斷當前線程狀態(tài)是否發(fā)生變化
                        if (runStateOf(c) != rs)
                            continue retry;
                        // else CAS failed due to workerCount change; retry inner loop
                    }
                }

                boolean workerStarted = false;
                boolean workerAdded = false;
                Worker w = null;
                try {
                    // 創(chuàng)建work
                    w = new Worker(firstTask);
                    //將新創(chuàng)建的work節(jié)點的線程 賦值給t
                    final Thread t = w.thread;
                    if (t != null) {
                        final ReentrantLock mainLock = this.mainLock;
                        //持有全局鎖,可能會阻塞,直到獲取成功為止,同一時刻操縱 線程池內部相關的操作,都必須持鎖。
                        mainLock.lock();
                        try {
                            //獲取最新線程池運行狀態(tài)保存到rs中
                            int rs = runStateOf(ctl.get());
                            //
                            if (rs < SHUTDOWN ||
                                    (rs == SHUTDOWN && firstTask == null)) {
                                if (t.isAlive()) // precheck that t is startable
                                    throw new IllegalThreadStateException();
                                //將創(chuàng)建的work添加到線程池中
                                workers.add(w);
                                // 獲取最新當前線程池線程數量
                                int s = workers.size();
                                if (s > largestPoolSize)
                                    largestPoolSize = s;
                                workerAdded = true;
                            }
                        } finally {
                            // 釋放鎖
                            mainLock.unlock();
                        }
                        if (workerAdded) {
                            // 添加work成功后,將創(chuàng)建的線程啟動
                            t.start();
                            workerStarted = true;
                        }
                    }
                } finally {
                    // 啟動失敗
                    if (!workerStarted)
                        // 釋放令牌
                        // 將當前worker清理出workers集合
                        addWorkerFailed(w);
                }
                return workerStarted;
            }

        addWorker方法總體就是做了兩件事

        第一步:判斷是否可以創(chuàng)建新的Work

        第二步:如果可以創(chuàng)建就創(chuàng)建新的Work,然后添加到任務隊列當中,最后啟動該線程。

        這里會看到,創(chuàng)建Work會加鎖,加了一個來保證線程安全,新創(chuàng)建的Work會添加到任務隊列當中,這個任務隊列其實就是通過HashSet來存儲work,最后啟動線程,啟動線程后,真正運行這個任務的方法就不在execute當中,而是通過

        Work類中的run方法來執(zhí)行。

        04

        runWorker方法

        通過execute方法來啟動線程后,就會通過work類中的run方法調用ThreadPoolExecutor的runWork方法來運行任務。

           // 當worker啟動時,會執(zhí)行run方法
                public void run() {
                    runWorker(this);
                }

          final void runWorker(Worker w{
                // 工作線程
                Thread wt = Thread.currentThread();
                // 任務
                Runnable task = w.firstTask;
                // 強制釋放鎖
                // 這里相當于無視那邊的中斷標記
                w.firstTask = null;
                w.unlock(); // allow interrupts
                boolean completedAbruptly = true;
                try {
                    // 取任務,如果有第一個任務,這里先執(zhí)行第一個任務
                    // 只要能取到任務,這就是個死循環(huán)
                    // getTask:取任務
                    while (task != null || (task = getTask()) != null) {
                        // 加鎖,是因為當調用shutDown方法它會判斷當前是否加鎖,加鎖就會跳過它接著執(zhí)行下一個任務
                        w.lock();
                        // 檢查線程池狀態(tài)
                        if ((runStateAtLeast(ctl.get(), STOP) ||
                                (Thread.interrupted() &&
                                        runStateAtLeast(ctl.get(), STOP))) &&
                                !wt.isInterrupted())
                            wt.interrupt();
                        try {
                            // 鉤子方法,方便子類在任務執(zhí)行前做一些處理
                            beforeExecute(wt, task);
                            Throwable thrown = null;
                            try {
                                // 真正任務執(zhí)行的地方
                                //task 可能是FutureTask 也可能是 普通的Runnable接口實現類。
                                //如果前面是通過submit()提交的 runnable/callable 會被封裝成 FutureTask。這個不清楚,請看上一期,在b站。
                                task.run();
                            } catch (RuntimeException x) {
                                thrown = x;
                                throw x;
                            } catch (Error x) {
                                thrown = x;
                                throw x;
                            } catch (Throwable x) {
                                thrown = x;
                                throw new Error(x);
                            } finally {
                                afterExecute(task, thrown);
                            }
                        } finally {
                            task = null;
                            w.completedTasks++;
                            w.unlock();
                        }
                    }
                    completedAbruptly = false;
                } finally {
                    processWorkerExit(w, completedAbruptly);
                }
            }

        runWorker方法就是真正執(zhí)行任務的方法,如果有第一個任務就先執(zhí)行第一個任務,第一個任務執(zhí)行完后就通過getTask()方法從任務隊列中獲取任務來執(zhí)行。


        05

        getTask()方法


        private Runnable getTask() {
                // 是否超時
                boolean timedOut = false// Did the last poll() time out?
                // 自旋
                for (; ; ) {
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    //當前程池狀態(tài)是SHUTDOWN的時候會把隊列中的任務執(zhí)行完直到隊列為空
                    // 線程池狀態(tài)是stop時退出
                    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                        decrementWorkerCount();
                        return null;
                    }
                    // 獲取工作線程數量
                    int wc = workerCountOf(c);

                    // 是否允許超時,有兩種情況:
                    // 1. 是允許核心線程數超時,這種就是說所有的線程都可能超時
                    // 2. 是工作線程數大于了核心數量,這種肯定是允許超時的
                    // 注意,非核心線程是一定允許超時的,這里的超時其實是指取任務超時
                    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

                    if ((wc > maximumPoolSize || (timed && timedOut))
                            && (wc > 1 || workQueue.isEmpty())) {
                        if (compareAndDecrementWorkerCount(c))
                            return null;
                        continue;
                    }

                    try {
                        // 真正取任務的地方
                        // 默認情況,只有當工作線程數量大于核心線程數量時,才會調用poll方法觸發(fā)超時調用

                        Runnable r = timed ?
                                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                                workQueue.take();
                        // 取到任務就返回
                        if (r != null)
                            return r;
                        timedOut = true;
                    } catch (InterruptedException retry) {
                        timedOut = false;
                    }
                }
            }


        這里取任務會根據工作線程的數量判斷是使用BlockingQueue的poll(timeout, unit)方法還是take()方法。

        poll(timeout, unit)方法會在超時時返回null,如果timeout<=0,隊列為空時直接返回null。

        take()方法會一直阻塞直到取到任務或拋出中斷異常。

        06

        submit方法


           public <T> Future<T> submit(Callable<T> task{
                if (task == nullthrow new NullPointerException();
                RunnableFuture<T> ftask = newTaskFor(task);
                execute(ftask);
                return ftask;
            }
            public Future<?> submit(Runnable task) {
                if (task == nullthrow new NullPointerException();
                RunnableFuture<Void> ftask = newTaskFor(task, null);
                execute(ftask);
                return ftask;
            }

        submit方法是支持傳入Runnable或者Callable,通過newFaskFor方法將其包裝到FutureTask進行處理,FutureTask會在下篇文章進行詳細講解,futureTask主要做了兩件事,一件事是擴展run方法,用來完成結果值的處理,另一件事是暴露其get方法,通過get方法獲取執(zhí)行結果,這個get方法是阻塞的。



        總結

          

        我做了一張思維導圖,通過這個思維導圖來梳理一遍線程池的大概脈絡

        PS:如果覺得我的分享不錯,歡迎大家隨手點贊、在看。

        (完)




        加我"微信獲取一份 最新Java面試題資料

        請備注:666不然不通過~


        最近好文


        1、Spring Boot 實現掃碼登錄,這種方式太香了?。?/a>

        2、SpringSecurity + JWT 實現單點登錄

        3、基于 Vue+Spring 前后端分離管理系統(tǒng)ELAdmin

        4、Spring Boot 接入支付寶完整流程實戰(zhàn)

        5、Spring Boot 實現多圖片上傳并回顯,漲姿勢了~



        最近面試BAT,整理一份面試資料Java面試BAT通關手冊,覆蓋了Java核心技術、JVM、Java并發(fā)、SSM、微服務、數據庫、數據結構等等。
        獲取方式:關注公眾號并回復 java 領取,更多內容陸續(xù)奉上。
        明天見(??ω??)??
        瀏覽 27
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 美女操B视频软件网战 | 国产成人精品亚洲精品色欲 | 欧美日本| 99欧美精品 | 美女被干网站 |