1. Java線程池源碼解析及高質(zhì)量代碼案例

        共 68826字,需瀏覽 138分鐘

         ·

        2021-05-30 08:51

        點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

        優(yōu)質(zhì)文章,第一時間送達(dá)

          作者 |  Star先生

        來源 |  urlify.cn/EvIrIr

        引言

        本文為Java高級編程中的一些知識總結(jié),其中第一章對Jdk 1.7.0_25中的多線程架構(gòu)中的線程池ThreadPoolExecutor源碼進(jìn)行架構(gòu)原理介紹以及源碼解析。第二章則分析了幾個違反Java高質(zhì)量代碼案例以及相應(yīng)解決辦法。如有總結(jié)的不好的地方,歡迎大家提出寶貴的意見和建議。 

        Java線程池架構(gòu)原理及源碼解析

        ThreadPoolExecutor是一個 ExecutorService,它使用可能的幾個池線程之一執(zhí)行每個提交的任務(wù),通常使用 Executors 工廠方法配置。線程池可以解決兩個不同問題:由于減少了每個任務(wù)調(diào)用的開銷,它們通??梢栽趫?zhí)行大量異步任務(wù)時提供增強(qiáng)的性能,并且還可以提供綁定和管理資源(包括執(zhí)行任務(wù)集時使用的線程)的方法。每個 ThreadPoolExecutor 還維護(hù)著一些基本的統(tǒng)計數(shù)據(jù),如完成的任務(wù)數(shù)。

        構(gòu)建參數(shù)源碼

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler)
        {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }

        參數(shù)解釋

        • corePoolSize:核心線程數(shù),會一直存活,即使沒有任務(wù),線程池也會維護(hù)線程的最少數(shù)量。

        • maximumPoolSize:線程池維護(hù)線程的最大數(shù)量。

        • keepAliveTime:線程池維護(hù)線程所允許的空閑時間,當(dāng)線程空閑時間達(dá)到keepAliveTime,該線程會退出,直到線程數(shù)量等于corePoolSize。如果allowCoreThreadTimeout設(shè)置為 
          true,則所有線程均會退出直到線程數(shù)量為0。 
          unit:線程池維護(hù)線程所允許的空閑時間的單位、可選參數(shù)值為:TimeUnit中的幾個靜態(tài)屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

        • workQueue:線程池所使用的緩沖隊列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。

        • handler:線程池中的數(shù)量大于maximumPoolSize,對拒絕任務(wù)的處理策略,默認(rèn)值ThreadPoolExecutor.AbortPolicy()。

        源碼詳細(xì)解析

        excute源碼

        public void execute(Runnable command)
        {
            if (command == null)
                throw new NullPointerException();
            if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
            {
                if (runState == RUNNING && workQueue.offer(command))
                {
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);
                }
                else if (!addIfUnderMaximumPoolSize(command))
                    reject(command); // is shutdown or saturated
            }
        }

        一個任務(wù)通過 execute(Runnable)方法被添加到線程池,任務(wù)就是一個Runnable類型的對象,任務(wù)的執(zhí)行方法就是run()方法,如果傳入的為null,側(cè)拋出NullPointerException。 
        首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進(jìn)入if的區(qū)域,當(dāng)然它不成立也有可能會進(jìn)入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進(jìn)去。 
        如果當(dāng)前線程數(shù)小于corePoolSize,調(diào)用addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先調(diào)用mainLock加鎖,再次判斷當(dāng)前線程數(shù)小于corePoolSize并且線程池處于RUNNING狀態(tài),則調(diào)用addThread增加線程。 


         
        圖一:ThreadPoolExecutor運(yùn)行狀態(tài)圖 


        addIfUnderCorePoolSize源碼

        private boolean addIfUnderCorePoolSize(Runnable firstTask)
        {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try
            {
                if (poolSize < corePoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            }
            finally
            {
                mainLock.unlock();
            }
            if (t == null)
                return false;
            t.start();
            return true;
        }

        addThread方法首先創(chuàng)建Work對象,然后調(diào)用threadFactory創(chuàng)建新的線程,如果創(chuàng)建的線程不為null,將Work對象的 thread屬性設(shè)置為此創(chuàng)建出來的線程,并將此Work對象放入workers中,然后在增加當(dāng)前線程池的中線程數(shù),增加后回到 addIfUnderCorePoolSize方法 ,釋放mainLock,最后啟動這個新創(chuàng)建的線程來執(zhí)行新傳入的任務(wù)。 
        可以發(fā)現(xiàn),這段源碼是如果發(fā)現(xiàn)小于corePoolSize就會創(chuàng)建一個新的線程,并且調(diào)用線程的start()方法將線程運(yùn)行起來:這個addThread()方法,我們先不考慮細(xì)節(jié),因?yàn)槲覀冞€要先看到前面是怎么進(jìn)去的,這里可以發(fā)信啊,只有沒有創(chuàng)建成功Thread才會返回false,也就是當(dāng)當(dāng)前的poolSize > corePoolSize的時候,或線程池已經(jīng)不是在running狀態(tài)的時候才會出現(xiàn)。 
        注意:這里在外部判定一次poolSize和corePoolSize只是初步判定,內(nèi)部是加鎖后判定的,以得到更為準(zhǔn)確的結(jié)果,而外部初步判定如果是大于了,就沒有必要進(jìn)入這段有鎖的代碼了。

        addThread源碼

        private Thread addThread(Runnable firstTask)
        {
            Worker w = new Worker(firstTask);
            Thread t = threadFactory.newThread(w);
            < span style = "color:#ff0000;" > < / span >
                           if (t != null)
            {
                w.thread = t;
                workers.add(w);
                int nt = ++poolSize;
                if (nt > largestPoolSize)
                    largestPoolSize = nt;
            }
            return t;
        }

        ThreadFactory接口默認(rèn)實(shí)現(xiàn)DefaultThreadFactory

        public Thread newThread(Runnable r)
        {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

        這里創(chuàng)建了一個Work,其余的操作,就是講poolSize疊加,然后將將其放入workers的運(yùn)行隊列等操作; 
        我們主要關(guān)心Worker是干什么的,因?yàn)檫@個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發(fā)現(xiàn)它的定義也是一個Runnable,外部開始在代碼段中發(fā)現(xiàn)了調(diào)用哪個這個Worker的start()方法,也就是線程的啟動方法,其實(shí)也就是調(diào)用了Worker的run()方法,那么我們重點(diǎn)要關(guān)心run方法是如何處理的。

        Worker的run方法

        public void run()
        {
            try
            {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null)
                {
                    runTask(task);
                    task = null;
                }
            }
            finally
            {
                workerDone(this);
            }
        }

        從以上方法可以看出,Worker所在的線程啟動后,首先執(zhí)行創(chuàng)建其時傳入的Runnable任務(wù),執(zhí)行完成后,循環(huán)調(diào)用getTask來獲取新的任務(wù),在沒有任務(wù)的情況下,退出此線程。FirstTask其實(shí)就是開始在創(chuàng)建work的時候,由外部傳入的Runnable對象,也就是你自己的Thread,你會發(fā)現(xiàn)它如果發(fā)現(xiàn)task為空,就會調(diào)用getTask()方法再判定,直到兩者為空,并且是一個while循環(huán)體。

        getTask源碼

        Runnable getTask()
        {
            for (;;)
            {
                try
                {
                    int state = runState;
                    if (state > SHUTDOWN)
                        return null;
                    Runnable r;
                    if (state == SHUTDOWN)  // Help drain queue
                        r = workQueue.poll();
                    else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                        r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                    else
                        r = workQueue.take();
                    if (r != null)
                        return r;
                    if (workerCanExit())
                    {
                        if (runState >= SHUTDOWN) // Wake up others
                            interruptIdleWorkers();
                        return null;
                    }
                    // Else retry
                }
                catch (InterruptedException ie)
                {
                    // On interruption, re-check runState
                }
            }
        }

        你會發(fā)現(xiàn)它是從workQueue隊列中,也就是等待隊列中獲取一個元素出來并返回!當(dāng)前線程運(yùn)行完后,在到workQueue中去獲取一個task出來,繼續(xù)運(yùn)行,這樣就保證了線程池中有一定的線程一直在運(yùn)行;此時若跳出了while循 環(huán),只有workQueue隊列為空才會出現(xiàn)或出現(xiàn)了類似于shutdown的操作,自然運(yùn)行隊列會減少1,當(dāng)再有新的線程進(jìn)來的時候,就又開始向 worker里面放數(shù)據(jù)了,這樣以此類推,實(shí)現(xiàn)了線程池的功能。

        execute方法部分實(shí)現(xiàn)

        if (runState == RUNNING && workQueue.offer(command))
        {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
        如果當(dāng)前線程池數(shù)量大于corePoolSize或addIfUnderCorePoolSize方法執(zhí)行失敗,則執(zhí)行后續(xù)操作;如果線程池處于運(yùn)行狀態(tài) 并且workQueue中成功加入任務(wù),再次判斷如果線程池的狀態(tài)不為運(yùn)行狀態(tài)或當(dāng)前線程池數(shù)為0,則調(diào)用 ensureQueuedTaskHandled方法

        ensureQueuedTaskHandled源碼

        private void ensureQueuedTaskHandled(Runnable command)
        {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            boolean reject = false;
            Thread t = null;
            try
            {
                int state = runState;
                if (state != RUNNING && workQueue.remove(command))
                    reject = true;
                else if (state < STOP &&
                         poolSize < Math.max(corePoolSize, 1) &&
                         !workQueue.isEmpty())
                    t = addThread(null);
            }
            finally
            {
                mainLock.unlock();
            }
            if (reject)
                reject(command);
            else if (t != null)
                t.start();
        }

        第一個if,也就是當(dāng)當(dāng)前狀態(tài)為running的時候,就會去執(zhí)行workQueue.offer(command),這個workQueue其實(shí)就是一 個BlockingQueue,offer()操作就是在隊列的尾部寫入一個對象,此時寫入的對象為線程的對象而已;所以你可以認(rèn)為只有線程池在 RUNNING狀態(tài),才會在隊列尾部插入數(shù)據(jù),否則就執(zhí)行else if,其實(shí)else if可以看出是要做一個是否大于MaximumPoolSize的判定,如果大于這個值,就會做reject的操作。ensureQueuedTaskHandled方法判斷線程池運(yùn)行,如果狀態(tài)不為運(yùn)行狀態(tài),從workQueue中刪除,并調(diào)用reject做拒絕處理。

        reject源碼

        void reject(Runnable command)
        {
            handler.rejectedExecution(command, this);
        }

        再次回到execute方法

        if (runState == RUNNING && workQueue.offer(command))
        {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated

        如線程池workQueue offer失敗或不處于運(yùn)行狀態(tài),調(diào)用addIfUnderMaximumPoolSize, addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize實(shí)現(xiàn)類似,不同點(diǎn)在于根據(jù)最大線程數(shù)(maximumPoolSize)進(jìn)行比較,如果超過最大線程數(shù),返回false,調(diào)用reject方法。

        addIfUnderMaximumPoolSize源碼

        private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
        {
            Thread t = null;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try
            {
                if (poolSize < maximumPoolSize && runState == RUNNING)
                    t = addThread(firstTask);
            }
            finally
            {
                mainLock.unlock();
            }
            if (t == null)
                return false;
            t.start();
            return true;
        }

        也就是如果線程池滿了,而且線程池調(diào)用了shutdown后,還在調(diào)用execute方法時,就會拋出上面說明的異常:RejectedExecutionException。

        workerDone源碼

        void workerDone(Worker w)
        {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try
            {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
                if (--poolSize == 0)
                    tryTerminate();
            }
            finally
            {
                mainLock.unlock();
            }
        }

        注意這里將workers.remove(w)掉,并且調(diào)用了—poolSize來做操作。至于tryTerminate是做了更多關(guān)于回收方面的操作。

        runTask(task)源碼

        private void runTask(Runnable task)
        {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try
            {
                if (runState < STOP &&
                        Thread.interrupted() &&
                        runState >= STOP)
                    thread.interrupt();
                boolean ran = false;
                beforeExecute(thread, task);
                try
                {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                }
                catch (RuntimeException ex)
                {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            }
            finally
            {
                runLock.unlock();
            }
        }

        你可以看到,這里面的task為傳入的task信息,調(diào)用的不是start方法,而是run方法,因?yàn)閞un方法直接調(diào)用不會啟動新的線程,也是因?yàn)檫@樣,導(dǎo)致了你無法獲取到你自己的線程的狀態(tài),因?yàn)榫€程池是直接調(diào)用的run方法,而不是start方法來運(yùn)行。 
        這里有個beforeExecute和afterExecute方法,分別代表在執(zhí)行前和執(zhí)行后,你可以做一段操作,在這個類中,這兩個方法都是空的,因?yàn)槠胀ň€程池?zé)o需做更多的操作。 
        如果你要實(shí)現(xiàn)類似暫停等待通知的或其他的操作,可以自己extends后進(jìn)行重寫構(gòu)造。

        添加任務(wù)處理流程

        AbortPolicy()

        public static class AbortPolicy implements RejectedExecutionHandler
        {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }

            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always.
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
            {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
        /*當(dāng)線程池中的數(shù)量等于最大線程數(shù)時,直接拋出拋出java.util.concurrent.RejectedExecutionException異常。*/

        CallerRunsPolicy()

        public static class CallerRunsPolicy implements RejectedExecutionHandler
        {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }

            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
            {
                if (!e.isShutdown())
                {
                    r.run();
                }
            }
        }

        當(dāng)線程池中的數(shù)量等于最大線程數(shù)時、重試執(zhí)行當(dāng)前的任務(wù),交由調(diào)用者線程來執(zhí)行任務(wù)。

        DiscardOldestPolicy()

        public static class DiscardOldestPolicy implements RejectedExecutionHandler
        {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }

            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
            {
                if (!e.isShutdown())
                {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }

        當(dāng)線程池中的數(shù)量等于最大線程數(shù)時、拋棄線程池中最后一個要執(zhí)行的任務(wù),并執(zhí)行新傳入的任務(wù)。

        DiscardPolicy()

        public static class DiscardPolicy implements RejectedExecutionHandler
        {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
            {
            }
        }

        當(dāng)線程池中的數(shù)量等于最大線程數(shù)時,不做任何動作。 
        通常你得到線程池后,會調(diào)用其中的:submit方法或execute方法去操作;其實(shí)你會發(fā)現(xiàn),submit方法最終會調(diào)用execute方法來進(jìn)行操 作,只是他提供了一個Future來托管返回值的處理而已,當(dāng)你調(diào)用需要有返回值的信息時,你用它來處理是比較好的;這個Future會包裝對 Callable信息,并定義一個Sync對象,當(dāng)你發(fā)生讀取返回值的操作的時候,會通過Sync對象進(jìn)入鎖,直到有返回值的數(shù)據(jù)通知。

        違反Java高質(zhì)量代碼案例

        異步運(yùn)算使用Callable接口

        Callable接口代碼如下:

        public interface Callable<V>{
            v call() throws Exception;
        }

        實(shí)現(xiàn)Callable接口,只是表明它是一個可調(diào)用的任務(wù),并不表示它具有多線程運(yùn)算的能力,還是要執(zhí)行器來執(zhí)行。代碼如下:

        class TaxCalculator implements Callable<Integer>{

            private int seedMoney;
            public TaxCalculator(int _seedMoney){
                seedMoney=_seedMoney;
            }
            @Override
            public Integer call() throws Exception {
               TimeUnit.MILLISECONDS.sleep(10000);
                return seedMoney/10;
            }

        }

        這里模擬稅款計算器運(yùn)算,可能花費(fèi)10秒鐘時間。用戶輸入即有輸出,若耗時較長,則顯示運(yùn)算進(jìn)度。如果我們直接計算,就只有一個main線程,是不可能友好提示的,如果稅金不計算完畢,也不會執(zhí)行后續(xù)動作,所以最好的辦法就是重啟一個線程來運(yùn)算,讓main線程做進(jìn)度提示

        public static void main(String[] args) throws Exception{
                ExecutorService es=Executors.newSingleThreadExecutor();
                Future<Integer> future=es.submit(new TaxCalculator(100));
                while(!future.isDone()){
                    TimeUnit.MILLISECONDS.sleep(200);
                    System.out.println("#");
                }
                System.out.println("\n 計算完成,稅金是:"+future.get()+"元");
                es.shutdown();

            }

        Executors是一個靜態(tài)工具類,提供了異步執(zhí)行器的創(chuàng)建能力,如單線程執(zhí)行newSingleThreadExcutor、固定線程數(shù)量的執(zhí)行器newFixedThreadPool等,一般是異步計算的入口類。

        優(yōu)先選擇線程池

        線程的狀態(tài)只能由新建狀態(tài)轉(zhuǎn)變?yōu)檫\(yùn)行態(tài)后才可能被阻塞或等待,最后終結(jié),不可能產(chǎn)生本末倒置的情況,代碼如下:

        public static void main(String[] args) throws Exception{

            Thread t=new Thread(new Runnable() {

                @Override
                public void run() {
                    System.out.println("線程在運(yùn)行");

                }
            });
            t.start();
            while(!t.getState().equals(Thread.State.TERMINATED)){
                TimeUnit.MILLISECONDS.sleep(10);
            }
            t.start();
        }

        此時程序運(yùn)行會報IllegalThreadStateException異常,原因就是不能從結(jié)束狀態(tài)直接轉(zhuǎn)換為可運(yùn)行狀態(tài)。這時可以引入線程池,當(dāng)系統(tǒng)需要時直接從線程池中獲得線程,運(yùn)算出結(jié)果,再把線程返回到線程池中,代碼如下:

        public static void main(String[] args) {
                ExecutorService es = Executors.newFixedThreadPool(2);
                for (int i = 0; i < 4; i++) {
                    es.submit(new Runnable() {

                        @Override
                        public void run() {
                            System.out.println(Thread.currentThread().getName());

                        }

                    });
                }
                es.shutdown();
            }

        線程死鎖

        Java是單線程語言,一旦線程死鎖,只能借助外部進(jìn)程重啟應(yīng)用才能解決。

        static class A {
                public synchronized void a1(B b) {
                    String name = Thread.currentThread().getName();
                    System.out.println(name + "進(jìn)入A.a1()");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                    System.out.println(name + "試圖訪問B.b2()");
                    b.b2();
                }

                public synchronized void a2() {
                    System.out.println("進(jìn)入 a.a2()");
                }
            }

            static class B {
                public synchronized void b1(A a) {
                    String name = Thread.currentThread().getName();
                    System.out.println(name + "進(jìn)入B.b1()");
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                    System.out.println(name + "試圖訪問A.a2()");
                    a.a2();
                }

                public synchronized void b2() {
                    System.out.println("進(jìn)入 B.b2()");
                }
            }

            public static void main(String[] args) {
                final A a = new A();
                final B b = new B();
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        a.a1(b);
                    }
                }, "線程A").start();
                ;
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        b.b1(a);
                    }
                }, "線程B").start();
                ;
            }

        此段程序定義了兩個資源A和B,然后在兩個線程A、B中使用了該資源,由于兩個資源之間有交互操作,并且都是同步方法,因此在線程A休眠1秒鐘后,它會試圖訪問資源B的b2方法,但是線程B持有該類的鎖,并同時在等待A線程釋放其鎖資源,所以此時就出現(xiàn)了兩個線程在互相等待釋放資源的情況,也就是死鎖??梢允褂米孕i改進(jìn),代碼如下:

        public  void b2()
        {
            try
            {
                if(Lock.trylock(2, TimeUnit.SECONDS))
                {
                    System.out.println("進(jìn)入 B.b2()");
                }
            }
            catch (InterruptedException e)
            {
                // TODO: handle exception
            }
            finally
            {
                Lock.unlock();
            }


        }

        它原理和互斥鎖一樣,如果一個執(zhí)行單元要想訪問被自旋鎖保護(hù)的共享資源,則必須先得到鎖,在訪問完共享資源后,也必須釋放鎖。

        忽略設(shè)置阻塞隊列長度

        BlockingQueue是一種集合,實(shí)現(xiàn)了Collection接口,容量是不可以自行管理的,代碼如下:

        public static void main(String[] args) throws Exception {
                BlockingDeque<String> bq = (BlockingDeque<String>) new ArrayBlockingQueue<String>(
                        5);
                for (int i = 0; i < 10; i++) {
                    bq.add("");
                }
            }

        阻塞隊列容量是固定的,非阻塞隊列則是變長的。阻塞隊列可以在聲明是指定隊列的容量,若指定的容量,則元素的數(shù)量不可超過該容量,若不指定,隊列的容量為Integer的最大值

        public  class ArrayBlockingQueue<E> extends AbstractQueue<E> implements
            BlockingDeque<E>, java.io.Serializable
        {
            public final E[] items;
            private int count;

            public boolean add(E e)
            {
                if (offer(e))
                    return true;
                else
                    throw new IllegalStateException("Queue full");
            }

            public boolean offer(E e)
            {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try
                {
                    if (count == items.length)
                        ;
                    else
                    {
                        insert(e);
                        return true;
                    }
                }
                finally
                {
                    lock.unlock();
                }
            }

        }

        上面在加入元素時,如果判斷當(dāng)前隊列已滿,則返回false,表示插入失敗,之后再包裝成隊列滿異常。

        使用stop方法停止線程

        stop方法會破壞原子邏輯,代碼如下:

        class MutiThread implements Runnable {
            int a = 0;

            @Override
            public void run() {
                // TODO Auto-generated method stub
                synchronized ("") {
                    a++;
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    a--;
                    String tn = Thread.currentThread().getName();
                    System.out.println(tn + ":a=" + a);

                }
            }

            public static void main(String[] args) {
                MutiThread t = new MutiThread();
                Thread t1 = new Thread(t);
                t1.start();
                for (int i = 0; i < 5; i++) {
                    new Thread(t).start();
                }
                t1.stop();
            }
        }

        所有線程共享了一個MutilThread的實(shí)例變量t,由于在run方法中加入了同步代碼塊,所以只能有一個線程進(jìn)入到synchronized塊中,可以自定義標(biāo)志位來決定線程執(zhí)行情況,代碼如下:

        class SafeStopThread extends Thread{
            private volatile boolean stop=false;
            @Override
            public void run()
            {//判斷線程體是否運(yùn)行
                while(stop)
                {}
            }
            //線程終止
            public void terminate(){
                stop=true;
            }
        }

        在線程體中判斷是否需要停止運(yùn)行,即可保證線程體的邏輯完整性,而且也不會破壞原子邏輯。

        覆寫start方法

        代碼:

        class MutiThread implements Thread
        {


            @Override
            public void start()
            {
                //調(diào)用線程體
                run();
            }
        }
        @Override
        public void run()
        {

        }
        }
        public static void main(String[] args)
        {
            MutiThread t = new MutiThread();
            t.start();
        }

        }

        main方法根本就沒有啟動一個子線程,整個應(yīng)用程序中只有一個主線程在運(yùn)行,并不會創(chuàng)建其他的線程。改進(jìn)后代碼如下:

        class MutiThread implements Thread
        {


            @Override
            public void start()
            {
                /*線程啟動前的業(yè)務(wù)處理*/
                super.start();
                /*線程啟動后的業(yè)務(wù)處理*/
            }
        }
        @Override
        public void run()
        {

        }
        }

        start方法調(diào)用父類的start方法,沒有主動調(diào)用run方法,由JVM自行調(diào)用,不用我們的顯式實(shí)現(xiàn)。

        使用過多線程優(yōu)先級

        Java線程有10個基本,級別為0代表JVM 
        代碼如下:

        class MutiThread implements Runnable {
            public void start(int _priority) {
                Thread t = new Thread(this);
                t.setPriority(_priority);
                t.start();
            }

            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    Math.hypot(Math.pow(924526789, i), Math.cos(i));
                }
                System.out.println("Priority:"+Thread.currentThread().getPriority());
            }
            public static void main(String[] args) {
                for(int i=0;i<20;i++)
                {
                    new MutiThread().start(i%10+1);
                }
            }
        }

        Java優(yōu)先級只是代表搶占CPU機(jī)會大小,優(yōu)先級越高,搶占CPU機(jī)會越大,被優(yōu)先執(zhí)行的可能性越高,優(yōu)先級相差不大,則搶占CPU機(jī)會差別也不大。導(dǎo)致優(yōu)先級為9的線程比優(yōu)先級為10的線程先運(yùn)行。于是在Thread類中設(shè)置三個優(yōu)先級,建議使用優(yōu)先級常量,而不是1到10的隨機(jī)數(shù)字,代碼如下:

          public final static int MIN_PRIORITY = 1;

        /**
          * The default priority that is assigned to a thread.
          */
        public final static int NORM_PRIORITY = 5;

        /**
         * The maximum priority that a thread can have.
         */
        public final static int MAX_PRIORITY = 10;

        /**
         * Returns a reference to the currently executing thread object.
         *
         * @return  the currently executing thread.
         */
        }

        Lock與synchronized

        Lock為顯式鎖,synchronized為內(nèi)部鎖,代碼如下:

        class  Task
        {
            public void dosomething(){
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    // TODO: handle exception
                }
                StringBuffer sb=new StringBuffer();
                sb.append("線程名:"+Thread.currentThread().getName());
                sb.append(",線程時間:"+Calendar.getInstance().get(13)+"s");
                System.out.println(sb);
            }
        }
        //顯示鎖任務(wù)
        class TaskWithLock extends Task implements Runnable{
        private final Lock lock=new ReentrantLock();
            @Override
            public void run() {
                try {
                    lock.lock();
                    dosomething();
                } finally
                {
                    lock.unlock();
                }

            }};
            //內(nèi)部鎖任務(wù)
            class TaskWithSync extends Task implements Runnable{

                @Override
                public void run() {

                        synchronized ("A") {
                            dosomething();

                        }


                }};

        對于同步資源來說,顯式鎖時對象級別的鎖,而內(nèi)部鎖時類級別的鎖,也就是說lock鎖時跟隨對象的,synchronized鎖時跟隨類 
        改進(jìn)方法:把Lock定義為所有線程的共享變量。

        public static void main(String[] args) {
                //多個線程共享鎖
                final Lock lock=new ReentrantLock();
                ……
            }

        線程池異常處理

        Java中線程執(zhí)行的任務(wù)接口java.lang.Runnable 要求不拋出Checked異常,

        public interface Runnable {   

            public abstract void run();   
        }

        那么如果 run() 方法中拋出了RuntimeException,將會怎么處理了? 
        通常java.lang.Thread對象運(yùn)行設(shè)置一個默認(rèn)的異常處理方法:

        java.lang.Thread.setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)  

        而這個默認(rèn)的靜態(tài)全局的異常捕獲方法時輸出堆棧。當(dāng)然,我們可以覆蓋此默認(rèn)實(shí)現(xiàn),只需要一個自定義的java.lang.Thread.UncaughtExceptionHandler接口實(shí)現(xiàn)即可。

        public interface UncaughtExceptionHandler {   

            void uncaughtException(Thread t, Throwable e);   
        }

        而在線程池中卻比較特殊。默認(rèn)情況下,線程池 java.util.concurrent.ThreadPoolExecutor 會Catch住所有異常, 當(dāng)任務(wù)執(zhí)行完成(java.util.concurrent.ExecutorService.submit(Callable))獲取其結(jié)果 時(java.util.concurrent.Future.get())會拋出此RuntimeException。

        /**   
         * Waits if necessary for the computation to complete, and then   
         * retrieves its result.   
         *   
         * @return the computed result   
         * @throws CancellationException if the computation was cancelled   
         * @throws ExecutionException if the computation threw an exception   
         * @throws InterruptedException if the current thread was interrupted while waiting   
         */   
        V get() throws InterruptedException, ExecutionException;

        其中 ExecutionException 異常即是java.lang.Runnable 或者 java.util.concurrent.Callable 拋出的異常。

        也就是說,線程池在執(zhí)行任務(wù)時捕獲了所有異常,并將此異常加入結(jié)果中。這樣一來線程池中的所有線程都將無法捕獲到拋出的異常。從而無法通過設(shè)置線程的默認(rèn)捕獲方法攔截的錯誤異常。也不同通過 自定義線程來完成異常的攔截。好在java.util.concurrent.ThreadPoolExecutor 預(yù)留了一個方法,運(yùn)行在任務(wù)執(zhí)行完畢進(jìn)行擴(kuò)展(當(dāng)然也預(yù)留一個protected方法beforeExecute(Thread t, Runnable r)):

        protected void afterExecute(Runnable r, Throwable t) { } 

        此方法的默認(rèn)實(shí)現(xiàn)為空,這樣我們就可以通過繼承或者覆蓋ThreadPoolExecutor 來達(dá)到自定義的錯誤處理。

        解決辦法如下:

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(11, 100, 1, TimeUnit.MINUTES, //   
                new ArrayBlockingQueue<Runnable>(10000),//   
                new DefaultThreadFactory()) {   

            protected void afterExecute(Runnable r, Throwable t) {   
                super.afterExecute(r, t);   
                printException(r, t);   
            }   
        };   

        private static void printException(Runnable r, Throwable t) {   
            if (t == null && r instanceof Future<?>) {   
                try {   
                    Future<?> future = (Future<?>) r;   
                    if (future.isDone())   
                        future.get();   
                } catch (CancellationException ce) {   
                    t = ce;   
                } catch (ExecutionException ee) {   
                    t = ee.getCause();   
                } catch (InterruptedException ie) {   
                    Thread.currentThread().interrupt(); // ignore/reset   
                }   
            }   
            if (t != null)   
                log.error(t.getMessage(), t);   
        }

        使用SimpleThread類

        TestThreadPool類是一個測試程序,用來模擬客戶端的請求,當(dāng)你運(yùn)行它時,系統(tǒng)首先會顯示線程池的初始化信息,然后提示你從鍵盤上輸入字符串,并按下回車鍵,這時你會發(fā)現(xiàn)屏幕上顯示信息,告訴你某個線程正在處理你的請求,如果你快速地輸入一行行字符串,那么你會發(fā)現(xiàn)線程池中不斷有線程被喚醒,來處理你的請求,在本例中,我創(chuàng)建了一個擁有10個線程的線程池,如果線程池中沒有可用線程了,系統(tǒng)會提示你相應(yīng)的警告信息,但如果你稍等片刻,那你會發(fā)現(xiàn)屏幕上會陸陸續(xù)續(xù)提示有線程進(jìn)入了睡眠狀態(tài),這時你又可以發(fā)送新的請求了。 
        代碼如下:

        //TestThreadPool.java

        import java.io.*;
        public class TestThreadPool
        {
            public static void main(String[] args)
            {
                try
                {
                    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                    String s;
                    ThreadPoolManager manager = new ThreadPoolManager(10);
                    while((s = br.readLine()) != null)
                    {
                        manager.process(s);
                    }
                }
                catch(IOException e) {}
            }
        }

        ThreadPoolManager類,顧名思義,它是一個用于管理線程池的類,它的主要職責(zé)是初始化線程池,并為客戶端的請求分配不同的線程來進(jìn)行處理,如果線程池滿了,它會對你發(fā)出警告信息。 
        代碼如下:

        import java.util.*;
          
          class ThreadPoolManager
             
        {
             
               private int maxThread;
               public Vector vector;
               public void setMaxThread(int threadCount)
              
            {
                   maxThread = threadCount;
                  
            }
              
               public ThreadPoolManager(int threadCount)
              
            {
                   setMaxThread(threadCount);
                  System.out.println("Starting thread pool...");
                   vector = new Vector();
                   for(int i = 1; i <= 10; i++)
                       {
                       SimpleThread thread = new SimpleThread(i);
                       vector.addElement(thread);
                       thread.start();
                      
                }
                  
            }
              
               public void process(String argument)
              
            {
                   int i;
                   for(i = 0; i < vector.size(); i++)
                      {
                      SimpleThread currentThread = (SimpleThread)vector.elementAt(i);
                       if(!currentThread.isRunning())
                           {
                          System.out.println("Thread " + (i + 1) + " is processing:" +
                          argument);
                         currentThread.setArgument(argument);
                           currentThread.setRunning(true);
                           return;
                         
                    }
                      
                }
                  if(i == vector.size())
                       {
                       System.out.println("pool is full, try in another time.");
                      
                }
                  
            }
              
        }//end of class ThreadPoolManager

        我們先關(guān)注一下這個類的構(gòu)造函數(shù),然后再看它的process()方法。第16-24行是它的構(gòu)造函數(shù),首先它給ThreadPoolManager類的成員變量maxThread賦值,maxThread表示用于控制線程池中最大線程的數(shù)量。第18行初始化一個數(shù)組vector,它用來存放所有的SimpleThread類,這時候就充分體現(xiàn)了JAVA語言的優(yōu)越性與藝術(shù)性:如果你用C語言的話,至少要寫100行以上的代碼來完成vector的功能,而且C語言數(shù)組只能容納類型統(tǒng)一的基本數(shù)據(jù)類型,無法容納對象。好了,閑話少說,第19-24行的循環(huán)完成這樣一個功能:先創(chuàng)建一個新的SimpleThread類,然后將它放入vector中去,最后用thread.start()來啟動這個線程,為什么要用start()方法來啟動線程呢?因?yàn)檫@是JAVA語言中所規(guī)定的,如果你不用的話,那這些線程將永遠(yuǎn)得不到激活,從而導(dǎo)致本示例程序根本無法運(yùn)行。

        process()方法,第30-40行的循環(huán)依次從vector數(shù)組中選取SimpleThread線程,并檢查它是否處于激活狀態(tài)(所謂激活狀態(tài)是指此線程是否正在處理客戶端的請求),如果處于激活狀態(tài)的話,那繼續(xù)查找vector數(shù)組的下一項(xiàng),如果vector數(shù)組中所有的線程都處于激活狀態(tài)的話,那它會打印出一條信息,提示用戶稍候再試。相反如果找到了一個睡眠線程的話,那第35-38行會對此進(jìn)行處理,它先告訴客戶端是哪一個線程來處理這個請求,然后將客戶端的請求,即字符串a(chǎn)rgument轉(zhuǎn)發(fā)給SimpleThread類的setArgument()方法進(jìn)行處理,并調(diào)用SimpleThread類的setRunning()方法來喚醒當(dāng)前線程,來對客戶端請求進(jìn)行處理。

        解決辦法是引入SimpleThread類,它是Thread類的一個子類,它才真正對客戶端的請求進(jìn)行處理,SimpleThread在示例程序初始化時都處于睡眠狀態(tài),但如果它接受到了ThreadPoolManager類發(fā)過來的調(diào)度信息,則會將自己喚醒,并對請求進(jìn)行處理。 
        代碼如下:

        class SimpleThread extends Thread
             
        {
               private boolean runningFlag;
               private String argument;
               public boolean isRunning()
              
            {
                   return runningFlag;
                  
            }
              public synchronized void setRunning(boolean flag)
              
            {
                   runningFlag = flag;
                   if(flag)
                       this.notify();
                  
            }
              
               public String getArgument()
              
            {
                   return this.argument;
                  
            }
               public void setArgument(String string)
              
            {
                   argument = string;
                  
            }
              
               public SimpleThread(int threadNumber)
              
            {
                   runningFlag = false;
                   System.out.println("thread " + threadNumber + "started.");
                  
            }
              
               public synchronized void run()
              
            {
                   try{
                       while(true)
                           {
                           if(!runningFlag)
                               {
                               this.wait();
                              
                        }
                           else
                               {
                               System.out.println("processing " + getArgument() + "... done.");
                               sleep(5000);
                               System.out.println("Thread is sleeping...");
                               setRunning(false);
                              
                        }
                          
                    }
                      
                }
                catch(InterruptedException e)
                {
                       System.out.println("Interrupt");
                      
                }
                  
            }//end of run()

              
        }//end of class SimpleThread

        線程使用不當(dāng)導(dǎo)致內(nèi)存溢出

        代碼如下:

        class IndexCallable implements Callable
        {
            private List<?> t;
            @override
            public object call()
            {
                ……
            }
        }

        程序是這樣的,有一個線程會往List中插入對象,線程池中的多個線程叢List中取數(shù)據(jù),然后進(jìn)行處理,處理完以后把對象從List中刪除。outofmemory有幾種可能:

        1.線程池中的處理線程在處理完以后沒有從List中刪掉元素

        2.向List中插入元素的速度高于從List中刪除元素的速度,造成List中積累的元素數(shù)量不斷攀升,可以隨時打印一下List中的元素數(shù)量,看是否是一支攀升。

        3.ArrayList和LinkedList都不是線程安全的,把List換成Vector或者保證List變量通過Synchronized同步訪問。

        4.在程序的其他地方還持有List中的對象句柄,雖然從List中刪掉了,如果別的地方還保存著該對象的句柄,那么也不會被垃圾回收。

        5.JVM的應(yīng)用程序最大可用內(nèi)存參數(shù)(-Xmx)配置過低

        如:

        JAVA_OPTS="-server -Xms800m -Xmx800m -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m -Djava.awt.headless=true "

        工作隊列

        是同一組固定的工作線程相結(jié)合的工作隊列,它使用 wait() 和 notify() 來通知等待線程新的工作已經(jīng)到達(dá)了。該工作隊列通常被實(shí)現(xiàn)成具有相關(guān)監(jiān)視器對象的某種鏈表,下邊的代碼顯示了簡單的合用工作隊列的示例。盡管 Thread API 沒有對使用 Runnable 接口強(qiáng)加特殊要求,但使用 Runnable 對象隊列的這種模式是調(diào)度程序和工作隊列的公共約定。

        public class WorkQueue
        {

            private final int nThreads;

            private final PoolWorker[] threads;

            private final LinkedList queue;

            public WorkQueue(int nThreads)
            {

                this.nThreads = nThreads;

                queue = new LinkedList();

                threads = new PoolWorker[nThreads];

                for (int i = 0; i

                        threads[i] = new PoolWorker();

                        threads[i].start();

            }

        }

                public void execute(Runnable r)
        {

            synchronized(queue)
            {

                queue.addLast(r);

                queue.notify();

            }

        }

        private class PoolWorker extends Thread
        {

            public void run()
            {

                Runnable r;

                while (true)
                {

                    synchronized(queue)
                    {

                        while (queue.isEmpty())
                        {

                            try

                            {

                                queue.wait();

                            }

                            catch (InterruptedException ignored)

                            {

                            }

                        }

                        r = (Runnable) queue.removeFirst();

                    }

                    // If we don't catch RuntimeException,

                    // the pool could leak threads

                    try
                    {

                        r.run();

                    }

                    catch (RuntimeException e)
                    {

                        // You might want to log something here

                    }

                }

            }

        }

        }

        實(shí)現(xiàn)使用的是 notify() 而不是 notifyAll() 。大多數(shù)專家建議使用 notifyAll() 而不是 notify() ,而且理由很充分:使用 notify() 具有難以捉摸的風(fēng)險,只有在某些特定條件下使用該方法才是合適的。另一方面,如果使用得當(dāng), notify() 具有比 notifyAll() 更可取的性能特征;特別是,notify() 引起的環(huán)境切換要少得多,這一點(diǎn)在服務(wù)器應(yīng)用程序中是很重要的。






        瀏覽 63
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
          
          

            1. 国产三级视频播放 | 欧美高清性色生活片免费观看 | 老师让我她我爽羞羞漫画 | 亚洲午夜成人精品一区二区 | 成人免费毛片 免费 |