多線程使用不當(dāng)導(dǎo)致的 OOM
往期熱門文章: 1、老板要我開發(fā)一個(gè)簡單的工作流引擎 2、Spring Boot 啟動(dòng)時(shí)自動(dòng)執(zhí)行代碼的幾種方式,還有誰不會(huì)?? 3、Lombok原理和同時(shí)使?@Data和@Builder 的坑 4、如何用 Java 幾分鐘處理完 30 億個(gè)數(shù)據(jù)? 5、面試官 | Spring Boot 項(xiàng)目如何統(tǒng)一結(jié)果,統(tǒng)一異常,統(tǒng)一日志?
事故描述
整體經(jīng)過
事故根本原因
public static void test() throws InterruptedException, ExecutionException {
Executor executor = Executors.newFixedThreadPool(3);
CompletionService<String> service = new ExecutorCompletionService<>(executor);
service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "HelloWorld--" + Thread.currentThread().getName();
}
});
}
ExecutorCompletionService 結(jié)果沒調(diào)用take、poll方法。public static void test() throws InterruptedException, ExecutionException {
Executor executor = Executors.newFixedThreadPool(3);
CompletionService<String> service = new ExecutorCompletionService<>(executor);
service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "HelloWorld--" + Thread.currentThread().getName();
}
});
service.take().get();
}
探尋問題根源
ExecutorCompletionService 的 “套路”,我們用 ExecutorService 來作為對比,可以讓我們更好地清楚什么場景下用 ExecutorCompletionService。ExecutorService 代碼(建議下載后自己跑一跑)public static void test1() throws Exception{
ExecutorService executorService = Executors.newCachedThreadPool();
ArrayList<Future<String>> futureArrayList = new ArrayList<>();
System.out.println("公司讓你通知大家聚餐 你開車去接人");
Future<String> future10 = executorService.submit(() -> {
System.out.println("總裁:我在家上大號 我最近拉肚子比較慢 要蹲1個(gè)小時(shí)才能出來 你等會(huì)來接我吧");
TimeUnit.SECONDS.sleep(10);
System.out.println("總裁:1小時(shí)了 我上完大號了。你來接吧");
return "總裁上完大號了";
});
futureArrayList.add(future10);
Future<String> future3 = executorService.submit(() -> {
System.out.println("研發(fā):我在家上大號 我比較快 要蹲3分鐘就可以出來 你等會(huì)來接我吧");
TimeUnit.SECONDS.sleep(3);
System.out.println("研發(fā):3分鐘 我上完大號了。你來接吧");
return "研發(fā)上完大號了";
});
futureArrayList.add(future3);
Future<String> future6 = executorService.submit(() -> {
System.out.println("中層管理:我在家上大號 要蹲10分鐘就可以出來 你等會(huì)來接我吧");
TimeUnit.SECONDS.sleep(6);
System.out.println("中層管理:10分鐘 我上完大號了。你來接吧");
return "中層管理上完大號了";
});
futureArrayList.add(future6);
TimeUnit.SECONDS.sleep(1);
System.out.println("都通知完了,等著接吧。");
try {
for (Future<String> future : futureArrayList) {
String returnStr = future.get();
System.out.println(returnStr + ",你去接他");
}
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
}
}
第一步:主線程把三個(gè)任務(wù)提交到線程池里面去,把對應(yīng)返回的 Future 放到 List 里面存起來,然后執(zhí)行“都通知完了,等著接吧?!边@行輸出語句;
第二步:在循環(huán)里面執(zhí)行
future.get()操作,阻塞等待。
ExecutorService 這種方式,并且恰巧前幾個(gè)任務(wù)調(diào)用的接口耗時(shí)比較久,同時(shí)阻塞等待,那就比較悲催了。所以 ExecutorCompletionService 應(yīng)景而出。它作為任務(wù)線程的合理管控者,“任務(wù)規(guī)劃師”的稱號名副其實(shí)。ExecutorCompletionService 代碼:public static void test2() throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
System.out.println("公司讓你通知大家聚餐 你開車去接人");
completionService.submit(() -> {
System.out.println("總裁:我在家上大號 我最近拉肚子比較慢 要蹲1個(gè)小時(shí)才能出來 你等會(huì)來接我吧");
TimeUnit.SECONDS.sleep(10);
System.out.println("總裁:1小時(shí)了 我上完大號了。你來接吧");
return "總裁上完大號了";
});
completionService.submit(() -> {
System.out.println("研發(fā):我在家上大號 我比較快 要蹲3分鐘就可以出來 你等會(huì)來接我吧");
TimeUnit.SECONDS.sleep(3);
System.out.println("研發(fā):3分鐘 我上完大號了。你來接吧");
return "研發(fā)上完大號了";
});
completionService.submit(() -> {
System.out.println("中層管理:我在家上大號 要蹲10分鐘就可以出來 你等會(huì)來接我吧");
TimeUnit.SECONDS.sleep(6);
System.out.println("中層管理:10分鐘 我上完大號了。你來接吧");
return "中層管理上完大號了";
});
TimeUnit.SECONDS.sleep(1);
System.out.println("都通知完了,等著接吧。");
//提交了3個(gè)異步任務(wù))
for (int i = 0; i < 3; i++) {
String returnStr = completionService.take().get();
System.out.println(returnStr + ",你去接他");
}
Thread.currentThread().join();
}

ExecutorCompletionService 使用了:completionService.take().get();
take() 然后再 get() 呢?CompletionService 接口以及接口的實(shí)現(xiàn)類1、ExecutorCompletionService 是 CompletionService 接口的實(shí)現(xiàn)類

2、接著跟一下 ExecutorCompletionService 的構(gòu)造方法。
LinkedBlockingQueue,不過還有另外一個(gè)構(gòu)造方法可以指定隊(duì)列類型,如下兩張圖,有兩個(gè)構(gòu)造方法。默認(rèn) LinkedBlockingQueue 的構(gòu)造方法。

3、Submit 任務(wù)提交的兩種方式,都是有返回值的,我們例子中用到的就是第一種 Callable 類型的方法。

4、對比ExecutorService 和 ExecutorCompletionService 的 submit 方法可以看出區(qū)別。


5、差異就在 QueueingFuture。
QueueingFuture繼承自FutureTask,而且紅線部分標(biāo)注的位置,重寫了 done() 方法;把 task 放到
completionQueue隊(duì)列里面。當(dāng)任務(wù)執(zhí)行完成后,task 就會(huì)被放到隊(duì)列里面去了;此時(shí)此刻,
completionQueue隊(duì)列里面的 task 都是已經(jīng)done()完成了的 task。而這個(gè) task 就是我們拿到的一個(gè)個(gè)的 future 結(jié)果;如果調(diào)用
completionQueue的 task 方法,會(huì)阻塞等待任務(wù)。等到的一定是完成了的 future,我們調(diào)用.get()方法 就能立馬獲得結(jié)果。

我們在使用
ExecutorService submit提交任務(wù)后需要關(guān)注每個(gè)任務(wù)返回的 future。然而CompletionService對這些 future 進(jìn)行了追蹤,并且重寫了 done 方法,讓你等的 completionQueue 隊(duì)列里面一定是完成了的 task;作為網(wǎng)關(guān) RPC 層,我們不用因?yàn)槟骋粋€(gè)接口的響應(yīng)慢拖累所有的請求,可以在處理最快響應(yīng)的業(yè)務(wù)場景里使用
CompletionService。
但是請注意!也是本次事故的核心問題。
ExecutorCompletionService 下面的 3 個(gè)方法的任意一個(gè)時(shí),阻塞隊(duì)列中的 task 執(zhí)行結(jié)果才會(huì)從隊(duì)列中移除掉,釋放堆內(nèi)存。
CompletionService。假如使用了,記得一定要從阻塞隊(duì)列中移除掉 task 執(zhí)行結(jié)果,避免 OOM!總結(jié)
上線前
嚴(yán)格的代碼 review 習(xí)慣,一定要交給 back 人去看,畢竟自己寫的代碼自己是看不出問題的,相信每個(gè)程序猿都有這個(gè)自信;
上線記錄:備注好上一個(gè)可回滾的包版本(給自己留一個(gè)后路);
上線前確認(rèn)回滾后,業(yè)務(wù)是否可降級。如果不可降級,一定要嚴(yán)格拉長這次上線的監(jiān)控周期。
上線后
持續(xù)關(guān)注內(nèi)存增長情況(這部分極容易被忽略,大家對內(nèi)存的重視度不如 CPU 使用率);
持續(xù)關(guān)注 CPU 使用率增長情況
GC 情況、線程數(shù)是否增長、是否有頻繁的 Full GC 等;
關(guān)注服務(wù)性能報(bào)警,TP99、999 、MAX 是否出現(xiàn)明顯的增高。
來源:juejin.cn/post/7064376361334358046
最近熱文閱讀:
1、面試官 | Spring Boot 項(xiàng)目如何統(tǒng)一結(jié)果,統(tǒng)一異常,統(tǒng)一日志? 2、為什么不建議使用ON DUPLICATE KEY UPDATE? 3、Java8 Stream,過分絲滑! 4、8 種最坑SQL語法,工作中踩過嗎? 5、Java 語言“坑爹” TOP 10 6、你還不明白如何解決分布式Session?看這篇就夠了! 7、能解決 80% 故障的排查思路 8、程序員坐牢了,會(huì)被安排寫代碼嗎? 9、面試被問Nginx,怎么破? 10、為什么很多 SpringBoot 開發(fā)者放棄了 Tomcat,選擇了 Undertow? 關(guān)注公眾號,你想要的Java都在這里
