沒想到,這么簡單的線程池用法,深藏這么多坑
點擊藍色“程序通事”關(guān)注我喲
加個“星標”,不迷路哦
又又又踩坑了
生產(chǎn)有個對賬系統(tǒng),每天需要從渠道端下載對賬文件,然后開始日終對賬。這個系統(tǒng)已經(jīng)運行了很久,前兩天突然收到短信預(yù)警,沒有獲取渠道端對賬文件。
“ps:對賬系統(tǒng)詳細實現(xiàn)方式:聊聊對賬系統(tǒng)的設(shè)計方案
本以為又是渠道端搞事情,上去一排查才發(fā)現(xiàn),所有下載任務(wù)都被阻塞了。再進一步排查源碼,才發(fā)現(xiàn)自己一直用錯了線程池某個方法。
由于線程創(chuàng)建比較昂貴,正式項目中我們都會使用線程池執(zhí)行異步任務(wù)。線程池,使用池化技術(shù)保存線程對象,使用的時候直接取出來,用完歸還以便使用。
雖然線程池的使用非常方法非常簡單,但是越簡單,越容易踩坑。細數(shù)一下,這些年來因為線程池導(dǎo)致生產(chǎn)事故也有好幾起。
所以今天,小黑哥就針對線程池的話題,給大家演示一下怎么使用線程池才會踩坑。
希望大家看完,可以完美避開這些坑~
慎用 Executors 組件
Java 從 JDK1.5 開始提供線程池的實現(xiàn)類,我們只需要在構(gòu)造函數(shù)內(nèi)傳入相關(guān)參數(shù),就可以創(chuàng)建一個線程池。

不過線程池的構(gòu)造函數(shù)可以說非常復(fù)雜,就算最簡單的那個構(gòu)造函數(shù),也需要傳入 5 個參數(shù)。這對于新手來說,非常不方便哇。
也許 JDK 開發(fā)者也考慮到這個問題,所以非常貼心給我們提供一個工具類 Executors,用來快捷創(chuàng)建創(chuàng)建線程池。
雖然這個工具類使用真的非常方便,可以少寫很多代碼,但是小黑哥還是建議生產(chǎn)系統(tǒng)還是老老實實手動創(chuàng)建線程池,慎用Executors,尤其是工具類中兩個方法 ?Executors#newFixedThreadPool與 Executors#newCachedThreadPool。
如果你圖了方便使用上述方法創(chuàng)建了線程池,那就是一顆定時炸彈,說不準那一天生產(chǎn)系統(tǒng)就會?。
我們來看兩個?,看下這個這兩個方法會有什么問題。
假設(shè)我們有個應(yīng)用有個批量接口,每次請求將會下載 100w 個文件,這里我們使用 Executors#newFixedThreadPool批量下載。
“下面方法中,我們隨機休眠,模擬真實下載耗時。
為了快速復(fù)現(xiàn)問題,調(diào)整 JVM 參數(shù)為
-Xmx128m -Xms128m。
private?ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
/**
?*?批量下載對賬文件
?*
?*?@return
?*/
@RequestMapping("/batchDownload")
public?String?batchDownload()?{
????
????//?模擬下載?100w?個文件
????for?(int?i?=?0;?i?1000000;?i++)?{
????????threadPool.execute(()?->?{
????????????//?隨機休眠,模擬下載耗時
????????????Random?random?=?new?Random();
????????????try?{
????????????????TimeUnit.SECONDS.sleep(random.nextInt(100));
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????});
????}
????return?"process";
}
程序運行之后,多請求幾次這個批量下載方法,程序很快就會 OOM 。

查看 Executors#newFixedThreadPool源碼,我們可以看到這個方法創(chuàng)建了一個默認的 LinkedBlockingQueue 當做任務(wù)隊列。
public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{
????return?new?ThreadPoolExecutor(nThreads,?nThreads,
??????????????????????????????????0L,?TimeUnit.MILLISECONDS,
??????????????????????????????????new?LinkedBlockingQueue());
}
這個問題槽點就在于 LinkedBlockingQueue,這個隊列的默認構(gòu)造方法如下:
/**
?*?Creates?a?{@code?LinkedBlockingQueue}?with?a?capacity?of
?*?{@link?Integer#MAX_VALUE}.
?*/
public?LinkedBlockingQueue()?{
????this(Integer.MAX_VALUE);
}
創(chuàng)建 LinkedBlockingQueue 隊列時,如果我們不指定隊列數(shù)量,默認數(shù)量上限為 Integer.MAX_VALUE。這么大的數(shù)量,我們簡直可以當做無界隊列了。
上面我們使用 newFixedThreadPool,我們僅使用了固定數(shù)量的線程下載。如果線程都在執(zhí)行任務(wù),線程池將會任務(wù)加入任務(wù)隊列中。
如果線程池執(zhí)行任務(wù)過慢,任務(wù)將會一直堆積在隊列中。由于我們隊列可以認為是無界的,可以無限制添加任務(wù),這就導(dǎo)致內(nèi)存占用越來越高,直到 OOM 爆倉。
下面我們將上面的例子稍微修改一下,使用 newCachedThreadPool 創(chuàng)建線程池。
程序運行之后,多請求幾次這個批量下載方法,程序很快就會 OOM ,不過這次報錯信息與之前信息與之前不同。

從報錯信息來看,這次 OOM 的主要原因是因為無法再創(chuàng)建新的線程。
這次看下一下 newCachedThreadPool 方法的源碼,可以看到這個方法將會創(chuàng)建最大線程數(shù)為 Integer.MAX_VALUE 的的線程池。

由于這個線程池使用 SynchronousQueue 隊列,這個隊列比較特殊,沒辦法存儲任務(wù)。所以默認情況下,線程池只要接到一個任務(wù),就會創(chuàng)建一個線程。
一旦線程池收到大量任務(wù),就會創(chuàng)建大量線程。Java 中的線程是會占用一定的內(nèi)存空間 ,所以創(chuàng)建大量的線程是必然會導(dǎo)致 OOM。
復(fù)用線程池
由于線程池的構(gòu)造方法比較復(fù)雜,而 Executors 創(chuàng)建的線程池比較坑,所以我們有個項目中自己封裝了一個線程池工具類。
工具類代碼如下:
public?static?ThreadPoolExecutor?getThreadPool()?{
????//?為了快速復(fù)現(xiàn)問題,故將線程池?核心線程數(shù)與最大線程數(shù)設(shè)置為?100
????return?new?ThreadPoolExecutor(100,?100,?60,?TimeUnit.SECONDS,?new?LinkedBlockingDeque<>(200));
}
項目代碼中這樣使用這個工具類:
@RequestMapping("/batchDownload")
public?String?batchDownload()?{
????ExecutorService?threadPool?=?ThreadPoolUtils.getThreadPool();
????//?模擬下載?100w?個文件
????for?(int?i?=?0;?i?100;?i++)?{
????????threadPool.execute(()?->?{
????????????//?隨機休眠,模擬下載耗時
????????????Random?random?=?new?Random();
????????????try?{
????????????????TimeUnit.SECONDS.sleep(random.nextInt(100));
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????});
????}
????return?"process";
}
使用 WRK 工具對這個接口同時發(fā)起多個請求,很快應(yīng)用就會拋出 OOM。

每次請求都會創(chuàng)建一個新的線程池執(zhí)行任務(wù),如果短時間內(nèi)有大量的請求,就會創(chuàng)建很多的線程池,間接導(dǎo)致創(chuàng)建很多線程。從而導(dǎo)致內(nèi)存占盡,發(fā)生 OOM 問題。
這個問題修復(fù)辦法很簡單,要么工具類生成一個單例線程池,要么項目代碼中復(fù)用創(chuàng)建出來的線程池。
Spring 異步任務(wù)
上面代碼中我們都是自己創(chuàng)建一個線程池執(zhí)行異步任務(wù),這樣還是比較麻煩。在 Spring 中, 我們可以在方法上使用 Spring 注解 @Async,然后執(zhí)行異步任務(wù)。
代碼如下:
@Async
public?void?async()?throws?InterruptedException?{
????log.info("async?process");
????Random?random?=?new?Random();
????TimeUnit.SECONDS.sleep(random.nextInt(100));
}
不過使用 Spring 異步任務(wù),我們需要自定義線程池,不然大量請求下,還是有可能發(fā)生 OOM 問題。
這是原因主要是 Spring 異步任務(wù)默認使用 Spring 內(nèi)部線程池 ?SimpleAsyncTaskExecutor 。
image-20200627191850022這個線程池比較坑爹,不會復(fù)用線程。也就是說來一個請求,將會新建一個線程。
所以如果需要使用異步任務(wù),一定要使用自定義線程池替換默認線程池。
如果使用 XML 配置,我們可以增加如下配置:
<task:executor?id="myexecutor"?pool-size="5"??/>
<task:annotation-driven?executor="myexecutor"/>
如果使用注解配置,我們需要設(shè)置一個 Bean:
@Bean(name?=?"threadPoolTaskExecutor")
public?Executor?threadPoolTaskExecutor()?{
????ThreadPoolTaskExecutor?executor=new?ThreadPoolTaskExecutor();
????executor.setCorePoolSize(5);
????executor.setMaxPoolSize(10);
????executor.setThreadNamePrefix("test-%d");
????//?其他設(shè)置
????return?new?ThreadPoolTaskExecutor();
}
然后使用注解時指定線程池名稱:
@Async("threadPoolTaskExecutor")
public?void?xx()?{
????//?業(yè)務(wù)邏輯
}
如果是 SpringBoot 項目,從本人測試情況來看,默認將會創(chuàng)建核心線程數(shù)為 8,最大線程數(shù)為 Integer.MAX_VALUE,隊列數(shù)也為 Integer.MAX_VALUE線程池。
“ps:以下代碼基于 Spring-Boot 2.1.6-RELEASE,暫不確定 Spring-Boot 1.x 版本是否也是這種策略,熟悉的同學(xué)的,也可以留言指出一下。

雖然上面的線程池不用擔(dān)心創(chuàng)建過多線程的問題,不是還是有可能隊列任務(wù)過多,導(dǎo)致 OOM 的問題。所以還是建議使用自定義線程池嗎,或者在配置文件修改默認配置,例如:
spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=20
spring.task.execution.pool.queue-capacity=200
線程池方法使用不當
最后再來說下文章開頭的我踩到的這個坑,這個問題主要是因為理解錯這個方法。
錯誤代碼如下:
//?創(chuàng)建線程池
ExecutorService?threadPool?=?...
List>?tasks?=?new?ArrayList<>();
//?批量創(chuàng)建任務(wù)
for?(int?i?=?0;?i?100;?i++)?{
????tasks.add(()?->?{
????????Random?random?=?new?Random();
????????try?{
????????????TimeUnit.SECONDS.sleep(random.nextInt(100));
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????????return?"success";
????});
}
//?執(zhí)行所有任務(wù)
List>?futures?=?threadPool.invokeAll(tasks);
//?獲取結(jié)果
for?(Future?future?:?futures)?{
????try?{
????????future.get();
????}?catch?(ExecutionException?e)?{
????????e.printStackTrace();
????}
}
上面代碼中,使用 invokeAll執(zhí)行所有任務(wù)。由于這個方法返回值為 List,我誤以為這個方法如 submit一樣,異步執(zhí)行,不會阻塞主線程。
實際上從源碼上,這個方法實際上逐個調(diào)用 Future#get獲取任務(wù)結(jié)果,而這個方法會同步阻塞主線程。

一旦某個任務(wù)被永久阻塞,比如 Socket ?網(wǎng)絡(luò)連接位置超時時間,導(dǎo)致任務(wù)一直阻塞在網(wǎng)絡(luò)連接,間接導(dǎo)致這個方法一直被阻塞,從而影響后續(xù)方法執(zhí)行。
如果需要使用 invokeAll 方法,最好使用其另外一個重載方法,設(shè)置超時時間。

總結(jié)
今天文章通過幾個例子,給大家展示了一下線程池使用過程一些坑。為了快速復(fù)現(xiàn)問題,上面的示例代碼還是比較極端,實際中可能并不會這么用。
不過即使這樣,我們千萬不要抱著僥幸的心理,認為這些任務(wù)很快就會執(zhí)行結(jié)束。我們在生產(chǎn)上碰到好幾次事故,正常的情況執(zhí)行都很快。但是偶爾外部程序抽瘋,返回時間變長,就可能導(dǎo)致系統(tǒng)中存在大量任務(wù),導(dǎo)致 OOM。
最后總結(jié)一下幾個線程池幾個最佳實踐:
第一,生產(chǎn)系統(tǒng)慎用 Executors 類提供的便捷方法,我們需要自己根據(jù)自己的業(yè)務(wù)場景,配置合理的線程數(shù),任務(wù)隊列,拒絕策略,線程回收策略等等,并且一定記得自定義線程池的命名方式,以便于后期排查問題。
第二,線程池不要重復(fù)創(chuàng)建,每次都創(chuàng)建一個線程池可能比不用線程池還要糟糕。如果使用其他同學(xué)創(chuàng)建的線程池工具類,最好還是看一下實現(xiàn)方式,防止自己誤用。
第三,一定不要按照自己的片面理解去使用 API 方法,如果把握不準,一定要去看下方法上注釋以及相關(guān)源碼。
最后最后(點個在看唄)
明天開始就要進入小黑屋封閉開發(fā)了(?_?) ,你們接下去一個月可能就看見不到我了~

所以,所以,可以來個在看嗎~
所以,所以,可以來個在看嗎~
所以,所以,可以來個在看嗎~
造了一個 Redis 分布鎖的輪子,沒想到還學(xué)到這么多東西!?。?/a>
MySQL 可重復(fù)讀,差點就讓我背上了一個 P0 事故!
