java線程池趣味事:這不是線程池
要想寫出高性能高并發(fā)的應用,自然有許多關鍵,如io,算法,異步,語言特性,操作系統(tǒng)特性,隊列,內(nèi)存,cpu,分布式,網(wǎng)絡,數(shù)據(jù)結構,高性能組件。
胡說一通先。
回到主題,線程池。如果說多線程是提高系統(tǒng)并發(fā)能力利器之一,那么線程池就是讓這個利器更容易控制的一種工具。如果我們自己純粹使用多線程基礎特性編寫,那么,必然需要相當老道的經(jīng)驗,才能夠駕馭復雜的環(huán)境。而線程池則不需要,你只需知道如何使用,即可輕松掌控多線程,安全地為你服務。
1:常見線程池的應用樣例
線程池,不說本身很簡單,但應用一定是簡單的。
線程池有許多的實現(xiàn),但我們只說 ThreadPoolExecutor 版本,因其應用最廣泛,別無其他。當然了,還有一個定時調(diào)度線程池 ScheduledThreadPoolExecutor 另說,因其需求場景不同,無法比較。
下面,我就幾個應用級別,說明下我們?nèi)绾慰焖偈褂镁€程池。(走走過場而已,無關其他)
1.1. 初級線程池
初級版本的使用線程池,只需要借助一個工具類即可:Executors . 它提供了許多靜態(tài)方法,你只需隨便選一個就可以使用線程池了。比如:
// 創(chuàng)建固定數(shù)量的線程池Executors.newFixedThreadPool(8);// 創(chuàng)建無限動態(tài)創(chuàng)建的線程池Executors.newCachedThreadPool();// 創(chuàng)建定時調(diào)度線程池Executors.newScheduledThreadPool(2);// 還有個創(chuàng)建單線程的就不說了,都一樣
使用上面這些方法創(chuàng)建好的線程池,直接調(diào)用其 execute() 或者 submit() 方法,就可以實現(xiàn)多線程編程了。沒毛??!
1.2. 中級線程池
我這里所說的中級,實際就是不使用以上超級簡單方式使用線程池的方式。即你已經(jīng)知道了 ThreadPoolExecutor 這個東東了。這不管你的出發(fā)點是啥!
// 自定義各線程參數(shù)ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 20, 20, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
具體參數(shù)解釋就不說了,咱們不掃盲。總之,使用這玩意兒,說明你已經(jīng)開始有點門道了。
1.3. 高級線程池
實際上,這個版本就沒法具體說如何做了。
但它可能是,你知道你的線程池應用場景的,你清楚你的硬件運行環(huán)境的,你會使用線程池命名的,你會定義你的隊列大小的,你會考慮上下文切換的,你會考慮線程安全的,你會考慮鎖性能的,你可能會自己造個輪子的。。。
2. 這不是線程池
我們通常理解的線程池,就是能夠同時跑多個任務的地方。但有時候線程池不一像線程池,而像一個單線程。來看一個具體的簡單的線程池的使用場景:
// 初始化線程池private ExecutorService executor= new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(50),new NamedThreadFactory("test-pool"),new ThreadPoolExecutor.CallerRunsPolicy());// 使用線程池處理任務public Integer doTask(String updateIntervalDesc) throws Exception {long startTime = System.currentTimeMillis();List<TestDto> testList;AtomicInteger affectNum = new AtomicInteger(0);int pageSize = 1000;AtomicInteger pageNo = new AtomicInteger(1);Map<String, Object> condGroupLabel = new HashMap<>();log.info("start do sth:{}", updateIntervalDesc);List<Future<?>> futureList = new ArrayList<>();do {PageHelper.startPage(pageNo.getAndIncrement(), pageSize);List<TestDto> list= testDao.getLabelListNew(condGroupLabel);testList = list;// 循環(huán)向線程池中提交任務for (TestDto s : list) {Future<?> future = executor.submit(() -> {try {// do sth...affectNum.incrementAndGet();}catch (Throwable e) {log.error("error:{}", pageNo.get(), e);}});futureList.add(future);}} while (testList.size() >= pageSize);// 等待任務完成int i = 0;for (Future<?> future : futureList) {future.get();log.info("done:+{} ", i++);}log.info("doTask done:{}, num:{}, cost:{}ms",updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);return affectNum.get();}
主要業(yè)務就是,從數(shù)據(jù)庫中取出許多任務,放入線程池中運行。因為任務又涉及到db等的io操作,所以使用多線程處理,非常合理。
然而,有一種情況的出現(xiàn),也許會打破這個平衡:那就是當單個任務能夠快速執(zhí)行完成時,而且快到剛上一任務提交完成,還沒等下一次提交時,就任務就已被執(zhí)行完成。這時,你就可能會看到一個神奇的現(xiàn)象,即一直只有一個線程在運行任務。這不是線程池該干的事,更像是單線程任務在跑。
然后,我們可能開始懷疑:某個線程被阻塞了?線程調(diào)度不公平了?隊列選擇不正確了?觸發(fā)jdk bug了?線程池未完全利用的線程了?等等。。。
然而結果并非如此,糾其原因只是當我們向線程池提交任務時,實際上只是向線程池的隊列中添加了任務。即上面顯示的 ArrayBlockingQueue 添加了任務,而線程池中的各worker負責從隊列中獲取任務進行執(zhí)行。而當任務數(shù)很少時,自然只有一部分worker會處理執(zhí)行中了。至于為什么一直是同一個線程在執(zhí)行,則可能是由于jvm的調(diào)度機制導致。事實上,是受制于 ArrayBlockingQueue.poll() 的公平性。而這個poll()的實現(xiàn)原理,則是由 wait/notify 機制的公平性決定的。
如下,是線程池的worker工作原理:
// java.util.concurrent.ThreadPoolExecutor#runWorker/*** Main worker run loop. Repeatedly gets tasks from queue and* executes them, while coping with a number of issues:** 1. We may start out with an initial task, in which case we* don't need to get the first one. Otherwise, as long as pool is* running, we get tasks from getTask. If it returns null then the* worker exits due to changed pool state or configuration* parameters. Other exits result from exception throws in* external code, in which case completedAbruptly holds, which* usually leads processWorkerExit to replace this thread.** 2. Before running any task, the lock is acquired to prevent* other pool interrupts while the task is executing, and then we* ensure that unless pool is stopping, this thread does not have* its interrupt set.** 3. Each task run is preceded by a call to beforeExecute, which* might throw an exception, in which case we cause thread to die* (breaking loop with completedAbruptly true) without processing* the task.** 4. Assuming beforeExecute completes normally, we run the task,* gathering any of its thrown exceptions to send to afterExecute.* We separately handle RuntimeException, Error (both of which the* specs guarantee that we trap) and arbitrary Throwables.* Because we cannot rethrow Throwables within Runnable.run, we* wrap them within Errors on the way out (to the thread's* UncaughtExceptionHandler). Any thrown exception also* conservatively causes thread to die.** 5. After task.run completes, we call afterExecute, which may* also throw an exception, which will also cause thread to* die. According to JLS Sec 14.20, this exception is the one that* will be in effect even if task.run throws.** The net effect of the exception mechanics is that afterExecute* and the thread's UncaughtExceptionHandler have as accurate* information as we can provide about any problems encountered by* user code.** @param w the worker*/final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// worker 不停地向隊列中獲取任務,然后執(zhí)行// 其中獲取任務的過程,可能被中斷,也可能不會,受到線程池伸縮配置的影響while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 可能調(diào)用超時方法,也可能調(diào)用阻塞方法// 固定線程池的情況下,調(diào)用阻塞 take() 方法Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
即線程池worker持續(xù)向隊列獲取任務,執(zhí)行即可。而隊列任務的獲取,則由兩個讀寫鎖決定:
// java.util.concurrent.ArrayBlockingQueue#takepublic E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 此處鎖,保證執(zhí)行線程安全性lock.lockInterruptibly();try {while (count == 0)// 此處釋放鎖等待,再次喚醒時,要求必須重新持有鎖notEmpty.await();return dequeue();} finally {lock.unlock();}}///*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;// 通知取等線程,喚醒notEmpty.signal();}
所以,具體誰取到任務,就是要看誰搶到了鎖。而這,可能又涉及到jvm的高效調(diào)度策略啥的了吧。(雖然不確定,但感覺像) 至少,任務運行的表象是,所有任務被某個線程一直搶到。即jvm認為,被某線程搶到是最優(yōu)策略。
3. 回歸線程池
線程池的目的,在于處理一些異步的任務,或者并發(fā)的執(zhí)行多個無關聯(lián)的任務。在于讓系統(tǒng)減負。而當任務的提交消耗,大于了任務的執(zhí)行消耗,那就沒必要使用多線程了,或者說這是錯誤的用法了。我們應該線程池做更重的活,而不是輕量級的。如上問題,執(zhí)行性能必然很差。但我們稍做轉變,也許就不一樣了。
// 初始化線程池private ExecutorService executor= new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),0L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(50),new NamedThreadFactory("test-pool"),new ThreadPoolExecutor.CallerRunsPolicy());// 使用線程池處理任務public Integer doTask(String updateIntervalDesc) throws Exception {long startTime = System.currentTimeMillis();List<TestDto> testList;AtomicInteger affectNum = new AtomicInteger(0);int pageSize = 1000;AtomicInteger pageNo = new AtomicInteger(1);Map<String, Object> condGroupLabel = new HashMap<>();log.info("start do sth:{}", updateIntervalDesc);List<Future<?>> futureList = new ArrayList<>();do {PageHelper.startPage(pageNo.getAndIncrement(), pageSize);List<TestDto> list= testDao.getLabelListNew(condGroupLabel);testList = list;// 一批任務只向線程池中提交任務Future<?> future = executor.submit(() -> {for (TestDto s : list) {try {// do sth...affectNum.incrementAndGet();}catch (Throwable e) {log.error("error:{}", pageNo.get(), e);}}});futureList.add(future);} while (testList.size() >= pageSize);// 等待任務完成int i = 0;for (Future<?> future : futureList) {future.get();log.info("done:+{} ", i++);}log.info("doTask done:{}, num:{}, cost:{}ms",updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);return affectNum.get();}
即,讓每個線程執(zhí)行的任務足夠重,以至于完全忽略提交的消耗。這樣才能夠發(fā)揮多線程的作用。

騰訊、阿里、滴滴后臺面試題匯總總結 — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學?那是因為你沒認真看完這篇文章

關注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構知識以及最新面試寶典


看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/14421826.html
