1. java線程池趣味事:這不是線程池

        共 13718字,需瀏覽 28分鐘

         ·

        2021-02-27 14:24

        走過路過不要錯過

        點擊藍字關注我們


        要想寫出高性能高并發(fā)的應用,自然有許多關鍵,如io,算法,異步,語言特性,操作系統(tǒng)特性,隊列,內(nèi)存,cpu,分布式,網(wǎng)絡,數(shù)據(jù)結構,高性能組件。

        胡說一通先。

        回到主題,線程池。如果說多線程是提高系統(tǒng)并發(fā)能力利器之一,那么線程池就是讓這個利器更容易控制的一種工具。如果我們自己純粹使用多線程基礎特性編寫,那么,必然需要相當老道的經(jīng)驗,才能夠駕馭復雜的環(huán)境。而線程池則不需要,你只需知道如何使用,即可輕松掌控多線程,安全地為你服務。

        1:常見線程池的應用樣例

        線程池,不說本身很簡單,但應用一定是簡單的。

        線程池有許多的實現(xiàn),但我們只說 ThreadPoolExecutor 版本,因其應用最廣泛,別無其他。當然了,還有一個定時調(diào)度線程池 ScheduledThreadPoolExecutor 另說,因其需求場景不同,無法比較。

        下面,我就幾個應用級別,說明下我們?nèi)绾慰焖偈褂镁€程池。(走走過場而已,無關其他)


        1.1. 初級線程池

        初級版本的使用線程池,只需要借助一個工具類即可:Executors . 它提供了許多靜態(tài)方法,你只需隨便選一個就可以使用線程池了。比如:

        // 創(chuàng)建固定數(shù)量的線程池Executors.newFixedThreadPool(8);// 創(chuàng)建無限動態(tài)創(chuàng)建的線程池Executors.newCachedThreadPool();// 創(chuàng)建定時調(diào)度線程池Executors.newScheduledThreadPool(2);// 還有個創(chuàng)建單線程的就不說了,都一樣

        使用上面這些方法創(chuàng)建好的線程池,直接調(diào)用其 execute() 或者 submit() 方法,就可以實現(xiàn)多線程編程了。沒毛??!

        1.2. 中級線程池

        我這里所說的中級,實際就是不使用以上超級簡單方式使用線程池的方式。即你已經(jīng)知道了 ThreadPoolExecutor 這個東東了。這不管你的出發(fā)點是啥!

        // 自定義各線程參數(shù)ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 20, 20, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        具體參數(shù)解釋就不說了,咱們不掃盲。總之,使用這玩意兒,說明你已經(jīng)開始有點門道了。

        1.3. 高級線程池

        實際上,這個版本就沒法具體說如何做了。

        但它可能是,你知道你的線程池應用場景的,你清楚你的硬件運行環(huán)境的,你會使用線程池命名的,你會定義你的隊列大小的,你會考慮上下文切換的,你會考慮線程安全的,你會考慮鎖性能的,你可能會自己造個輪子的。。。

        2. 這不是線程池

        我們通常理解的線程池,就是能夠同時跑多個任務的地方。但有時候線程池不一像線程池,而像一個單線程。來看一個具體的簡單的線程池的使用場景:

            // 初始化線程池    private ExecutorService executor            = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),                Runtime.getRuntime().availableProcessors(),                0L, TimeUnit.SECONDS,                new ArrayBlockingQueue<>(50),                new NamedThreadFactory("test-pool"),                new ThreadPoolExecutor.CallerRunsPolicy());    // 使用線程池處理任務    public Integer doTask(String updateIntervalDesc) throws Exception {        long startTime = System.currentTimeMillis();        List<TestDto> testList;        AtomicInteger affectNum = new AtomicInteger(0);        int pageSize = 1000;        AtomicInteger pageNo = new AtomicInteger(1);        Map<String, Object> condGroupLabel = new HashMap<>();        log.info("start do sth:{}", updateIntervalDesc);        List<Future<?>> futureList = new ArrayList<>();        do {            PageHelper.startPage(pageNo.getAndIncrement(), pageSize);            List<TestDto> list                    = testDao.getLabelListNew(condGroupLabel);            testList = list;            // 循環(huán)向線程池中提交任務            for (TestDto s : list) {                Future<?> future = executor.submit(() -> {                    try {                        // do sth...                        affectNum.incrementAndGet();                    }                    catch (Throwable e) {                        log.error("error:{}", pageNo.get(), e);                    }                });                futureList.add(future);            }        } while (testList.size() >= pageSize);        // 等待任務完成        int i = 0;        for (Future<?> future : futureList) {            future.get();            log.info("done:+{} ", i++);        }        log.info("doTask done:{}, num:{}, cost:{}ms",                updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);        return affectNum.get();    }


        主要業(yè)務就是,從數(shù)據(jù)庫中取出許多任務,放入線程池中運行。因為任務又涉及到db等的io操作,所以使用多線程處理,非常合理。

        然而,有一種情況的出現(xiàn),也許會打破這個平衡:那就是當單個任務能夠快速執(zhí)行完成時,而且快到剛上一任務提交完成,還沒等下一次提交時,就任務就已被執(zhí)行完成。這時,你就可能會看到一個神奇的現(xiàn)象,即一直只有一個線程在運行任務。這不是線程池該干的事,更像是單線程任務在跑。

        然后,我們可能開始懷疑:某個線程被阻塞了?線程調(diào)度不公平了?隊列選擇不正確了?觸發(fā)jdk bug了?線程池未完全利用的線程了?等等。。。

        然而結果并非如此,糾其原因只是當我們向線程池提交任務時,實際上只是向線程池的隊列中添加了任務。即上面顯示的 ArrayBlockingQueue 添加了任務,而線程池中的各worker負責從隊列中獲取任務進行執(zhí)行。而當任務數(shù)很少時,自然只有一部分worker會處理執(zhí)行中了。至于為什么一直是同一個線程在執(zhí)行,則可能是由于jvm的調(diào)度機制導致。事實上,是受制于 ArrayBlockingQueue.poll() 的公平性。而這個poll()的實現(xiàn)原理,則是由 wait/notify 機制的公平性決定的。

        如下,是線程池的worker工作原理:

            // java.util.concurrent.ThreadPoolExecutor#runWorker    /**     * Main worker run loop.  Repeatedly gets tasks from queue and     * executes them, while coping with a number of issues:     *     * 1. We may start out with an initial task, in which case we     * don't need to get the first one. Otherwise, as long as pool is     * running, we get tasks from getTask. If it returns null then the     * worker exits due to changed pool state or configuration     * parameters.  Other exits result from exception throws in     * external code, in which case completedAbruptly holds, which     * usually leads processWorkerExit to replace this thread.     *     * 2. Before running any task, the lock is acquired to prevent     * other pool interrupts while the task is executing, and then we     * ensure that unless pool is stopping, this thread does not have     * its interrupt set.     *     * 3. Each task run is preceded by a call to beforeExecute, which     * might throw an exception, in which case we cause thread to die     * (breaking loop with completedAbruptly true) without processing     * the task.     *     * 4. Assuming beforeExecute completes normally, we run the task,     * gathering any of its thrown exceptions to send to afterExecute.     * We separately handle RuntimeException, Error (both of which the     * specs guarantee that we trap) and arbitrary Throwables.     * Because we cannot rethrow Throwables within Runnable.run, we     * wrap them within Errors on the way out (to the thread's     * UncaughtExceptionHandler).  Any thrown exception also     * conservatively causes thread to die.     *     * 5. After task.run completes, we call afterExecute, which may     * also throw an exception, which will also cause thread to     * die. According to JLS Sec 14.20, this exception is the one that     * will be in effect even if task.run throws.     *     * The net effect of the exception mechanics is that afterExecute     * and the thread's UncaughtExceptionHandler have as accurate     * information as we can provide about any problems encountered by     * user code.     *     * @param w the worker     */    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            // worker 不停地向隊列中獲取任務,然后執(zhí)行            // 其中獲取任務的過程,可能被中斷,也可能不會,受到線程池伸縮配置的影響            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }    /**     * Performs blocking or timed wait for a task, depending on     * current configuration settings, or returns null if this worker     * must exit because of any of:     * 1. There are more than maximumPoolSize workers (due to     *    a call to setMaximumPoolSize).     * 2. The pool is stopped.     * 3. The pool is shutdown and the queue is empty.     * 4. This worker timed out waiting for a task, and timed-out     *    workers are subject to termination (that is,     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})     *    both before and after the timed wait, and if the queue is     *    non-empty, this worker is not the last thread in the pool.     *     * @return task, or null if the worker must exit, in which case     *         workerCount is decremented     */    private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?
        for (;;) { int c = ctl.get(); int rs = runStateOf(c);
        // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
        int wc = workerCountOf(c);
        // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
        try { // 可能調(diào)用超時方法,也可能調(diào)用阻塞方法 // 固定線程池的情況下,調(diào)用阻塞 take() 方法 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

        即線程池worker持續(xù)向隊列獲取任務,執(zhí)行即可。而隊列任務的獲取,則由兩個讀寫鎖決定:

            // java.util.concurrent.ArrayBlockingQueue#take    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        // 此處鎖,保證執(zhí)行線程安全性        lock.lockInterruptibly();        try {            while (count == 0)                // 此處釋放鎖等待,再次喚醒時,要求必須重新持有鎖                notEmpty.await();            return dequeue();        } finally {            lock.unlock();        }    }    //     /**     * Inserts the specified element at the tail of this queue, waiting     * for space to become available if the queue is full.     *     * @throws InterruptedException {@inheritDoc}     * @throws NullPointerException {@inheritDoc}     */    public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }    /**     * Inserts element at current put position, advances, and signals.     * Call only when holding lock.     */    private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        // 通知取等線程,喚醒        notEmpty.signal();    }

        所以,具體誰取到任務,就是要看誰搶到了鎖。而這,可能又涉及到jvm的高效調(diào)度策略啥的了吧。(雖然不確定,但感覺像) 至少,任務運行的表象是,所有任務被某個線程一直搶到。即jvm認為,被某線程搶到是最優(yōu)策略。

        3. 回歸線程池

        線程池的目的,在于處理一些異步的任務,或者并發(fā)的執(zhí)行多個無關聯(lián)的任務。在于讓系統(tǒng)減負。而當任務的提交消耗,大于了任務的執(zhí)行消耗,那就沒必要使用多線程了,或者說這是錯誤的用法了。我們應該線程池做更重的活,而不是輕量級的。如上問題,執(zhí)行性能必然很差。但我們稍做轉變,也許就不一樣了。

            // 初始化線程池    private ExecutorService executor            = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),                Runtime.getRuntime().availableProcessors(),                0L, TimeUnit.SECONDS,                new ArrayBlockingQueue<>(50),                new NamedThreadFactory("test-pool"),                new ThreadPoolExecutor.CallerRunsPolicy());    // 使用線程池處理任務    public Integer doTask(String updateIntervalDesc) throws Exception {        long startTime = System.currentTimeMillis();        List<TestDto> testList;        AtomicInteger affectNum = new AtomicInteger(0);        int pageSize = 1000;        AtomicInteger pageNo = new AtomicInteger(1);        Map<String, Object> condGroupLabel = new HashMap<>();        log.info("start do sth:{}", updateIntervalDesc);        List<Future<?>> futureList = new ArrayList<>();        do {            PageHelper.startPage(pageNo.getAndIncrement(), pageSize);            List<TestDto> list                    = testDao.getLabelListNew(condGroupLabel);            testList = list;            // 一批任務只向線程池中提交任務            Future<?> future = executor.submit(() -> {                for (TestDto s : list) {                    try {                        // do sth...                        affectNum.incrementAndGet();                    }                    catch (Throwable e) {                        log.error("error:{}", pageNo.get(), e);                    }                }            });            futureList.add(future);        } while (testList.size() >= pageSize);        // 等待任務完成        int i = 0;        for (Future<?> future : futureList) {            future.get();            log.info("done:+{} ", i++);        }        log.info("doTask done:{}, num:{}, cost:{}ms",                updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);        return affectNum.get();    }

        即,讓每個線程執(zhí)行的任務足夠重,以至于完全忽略提交的消耗。這樣才能夠發(fā)揮多線程的作用。




        往期精彩推薦



        騰訊、阿里、滴滴后臺面試題匯總總結 — (含答案)

        面試:史上最全多線程面試題 !

        最新阿里內(nèi)推Java后端面試題

        JVM難學?那是因為你沒認真看完這篇文章


        END


        關注作者微信公眾號 —《JAVA爛豬皮》


        了解更多java后端架構知識以及最新面試寶典


        你點的每個好看,我都認真當成了


        看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


        作者:等你歸去來

        出處:https://www.cnblogs.com/yougewe/p/14421826.html

        瀏覽 68
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 91超碰香蕉 | 伊人久久无码 | 爱草在线 | 欧美一区二区三区久久久久久桃花 | 超碰国内|