Java 異步調(diào)用原理與實(shí)戰(zhàn)經(jīng)驗(yàn)總結(jié)
本文介紹了線上業(yè)務(wù)中的一些異步調(diào)用實(shí)踐經(jīng)驗(yàn),包含 ?IO 模型介紹、CompletableFuture 的基本使用、RPC 異步調(diào)用、異步 HTTP 客戶端 Spring WebClient 的使用等。RPC 使用前文介紹的手寫 RPC 框架,該框架支持異步調(diào)用。
本文要點(diǎn):
- 為什么需要異步調(diào)用
- CompletableFuture 基本使用
- RPC 異步調(diào)用
- HTTP 異步調(diào)用
- 編排 CompletableFuture 提高吞吐量
為什么異步
BIO 模型
首先我們先回顧一下 BIO 模型:
BIO 模型當(dāng)用戶進(jìn)程調(diào)用了recvfrom 這個(gè)系統(tǒng)調(diào)用,kernel 就開始了 IO 的第一個(gè)階段:準(zhǔn)備數(shù)據(jù)。對(duì)于 network io 來說,很多時(shí)候數(shù)據(jù)在一開始還沒有到達(dá)(比如,還沒有收到一個(gè)完整的UDP包),這個(gè)時(shí)候 kernel 就要等待足夠的數(shù)據(jù)到來。而在用戶進(jìn)程這邊,整個(gè)進(jìn)程會(huì)被阻塞。當(dāng) kernel 一直等到數(shù)據(jù)準(zhǔn)備好了,它就會(huì)將數(shù)據(jù)從 kernel 中拷貝到用戶內(nèi)存,然后 kernel 返回結(jié)果,用戶進(jìn)程才解除 block 的狀態(tài),重新運(yùn)行起來。所以,Blocking IO 的特點(diǎn)就是在 IO 執(zhí)行的兩個(gè)階段都被 block 了。
同步調(diào)用
同步調(diào)用在同步調(diào)用的場(chǎng)景下,依次請(qǐng)求多個(gè)接口,耗時(shí)長(zhǎng)、性能差,接口響應(yīng)時(shí)長(zhǎng) T > T1+T2+T3+……+Tn。
減少同步等待
一般這個(gè)時(shí)候?yàn)榱藴p少同步等待時(shí)間,會(huì)使用線程池來同時(shí)處理多個(gè)任務(wù),接口的響應(yīng)時(shí)間就是 MAX(T1,T2,T3):
線程池異步大概代碼如下:
Future<String>?future?=?executorService.submit(()?->?{
??Thread.sleep(2000);
??return?"hello?world";
});
while?(true)?{
??if?(future.isDone())?{
????System.out.println(future.get());
????break;
??}
}
同步模型中使用線程池確實(shí)能實(shí)現(xiàn)異步調(diào)用的效果,也能壓縮同步等待的時(shí)間,但是也有一些缺陷:
CPU 資源大量浪費(fèi)在阻塞等待上,導(dǎo)致 CPU 資源利用率低。
為了增加并發(fā)度,會(huì)引入更多額外的線程池,隨著 CPU 調(diào)度線程數(shù)的增加,會(huì)導(dǎo)致更嚴(yán)重的資源爭(zhēng)用,上下文切換占用 CPU 資源。
線程池中的線程都是阻塞的,硬件資源無法充分利用,系統(tǒng)吞吐量容易達(dá)到瓶頸。
NIO 模型
為了解決 BIO 中的缺陷,引入 NIO 模型:
NIO 模型當(dāng)用戶進(jìn)程發(fā)出 read 操作時(shí),如果 kernel 中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會(huì) block 用戶進(jìn)程,而是立刻返回一個(gè) error。從用戶進(jìn)程角度講 ,它發(fā)起一個(gè) read 操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè) error 時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是它可以再次發(fā)送 read 操作。一旦 kernel 中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的 system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。所以,用戶進(jìn)程其實(shí)是需要不斷的主動(dòng)詢問 kernel 數(shù)據(jù)好了沒有。
異步優(yōu)化思路
我們知道了 NIO 的調(diào)用方式比 BIO 好,那我們?cè)趺茨茉跇I(yè)務(wù)編碼中使用到 NIO 呢?自己動(dòng)手將 BIO 替換成 NIO 肯定不現(xiàn)實(shí),已有組件支持 NIO 的可以直接使用,不支持的繼續(xù)使用自定義線程池。
- 通過 RPC NIO 異步調(diào)用、 HTTP 異步調(diào)用的方式降低線程數(shù),從而降低調(diào)度(上下文切換)開銷。
- 沒有原生支持 NIO 異步調(diào)用的繼續(xù)使用線程池。
- 引入 CompletableFuture 對(duì)業(yè)務(wù)流程進(jìn)行編排,降低依賴之間的阻塞。
簡(jiǎn)述CompletableFuture
CompletableFuture 是 java.util.concurrent 庫(kù)在 java 8 中新增的主要工具,同傳統(tǒng)的 Future 相比,其支持流式計(jì)算、函數(shù)式編程、完成通知、自定義異常處理等很多新的特性。
常用 API 舉例
supplyAsync
CompletableFuture<String>?future?=?CompletableFuture.supplyAsync(()->{????
??try{
????Thread.sleep(1000L);
????return?"hello?world";
??}?catch?(Exception?e){
????return?"failed";
??}
});
System.out.println(future.join());
//?output
hello?world
開啟異步任務(wù),到另一個(gè)線程執(zhí)行。
complete
CompletableFuture<String>?future1?=?new?CompletableFuture<>();
future.complete("hello?world");?????//異步線程執(zhí)行
future.whenComplete((res,?throwable)?->?{
??System.out.println(res);
});
System.out.println(future1.join());
CompletableFuture<String>?future2?=?new?CompletableFuture<>();
future.completeExceptionally(new?Throwable("failed"));?//異步線程執(zhí)行
System.out.println(future2.join());
//?output
hello?world
hello?world
??
Exception?in?thread?"main"?
java.util.concurrent.CompletionException:?
java.lang.Throwable:?failed
complete 正常完成該 CompletableFuture。
completeExceptionally 異常完成該 CompletableFuture。
thenApply
String?original?=?"Message";
CompletableFuture<String>?cf?=?
?CompletableFuture.completedFuture(original).thenApply(String::toUpperCase);
System.out.println(cf.join());
//?output
MESSAGE
任務(wù)后置處理。
圖示:
thenApply 圖示thenCombine
CompletableFuture<String>?cf?=?
?CompletableFuture.completedFuture("Message").thenApply(String::toUpperCase);
CompletableFuture<String>?cf1?=?
?CompletableFuture.completedFuture("Message").thenApply(String::toLowerCase);
CompletableFuture<String>?allCf?=?cf.thenCombine(cf1,?(s1,?s2)?->?s1?+?s2);
System.out.println(allCf.join());
//?output
MSGmsg
合并任務(wù),兩個(gè)任務(wù)同時(shí)執(zhí)行,結(jié)果由合并函數(shù) BiFunction 返回。
圖示:
thenCombine 圖示allOf
CompletableFuture<String>?future1?=?CompletableFuture.supplyAsync(()?->?"Message1");
CompletableFuture<String>?future2?=?CompletableFuture.supplyAsync(()?->?"Message2");
CompletableFuture<String>?future3?=?CompletableFuture.supplyAsync(()?->?"Message3");
CompletableFuture<String>?future?=?
?CompletableFuture.allOf(future1,?future2,?future3).thenApply(v?->?{
??String?join1?=?future1.join();
??String?join2?=?future2.join();
??String?join3?=?future3.join();
??return?join1?+?join2?+?join3;});
System.out.println(future.join());
//?output
Msg1Msg2Msg3
allOf 會(huì)阻塞等待所有異步線程任務(wù)結(jié)束。
allOf 里的 join 并不會(huì)阻塞,傳給 thenApply 的函數(shù)是在 future1, future2, future3 全部完成時(shí),才會(huì)執(zhí)行 。
圖示:
allOf 圖示CF 執(zhí)行線程
下面有兩個(gè)小demo,可以先試著想想輸出的結(jié)果:
String?original?=?"Message";
CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{
??System.out.println("supplyAsync?thread:?"?+?Thread.currentThread().getName());
??return?original;
}).thenApply(r?->?{
??System.out.println("thenApply?thread:?"?+?Thread.currentThread().getName());
??return?r;
});
System.out.println(cf.join());
//?output
supplyAsync?thread:?ForkJoinPool.commonPool-worker-1
thenApply?thread:?main
Message
String?original?=?"Message";
CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{
??System.out.println("supplyAsync?thread:?"?+?Thread.currentThread().getName());
??try?{
????Thread.sleep(100);
??}?catch?(InterruptedException?e)?{
????throw?new?RuntimeException(e);
??}
??return?original;
}).thenApply(r?->?{
??System.out.println("thenApply?thread:?"?+?Thread.currentThread().getName());
??return?r;
});
System.out.println(cf.join());
//?output
supplyAsync?thread:?ForkJoinPool.commonPool-worker-1
thenApply?thread:?ForkJoinPool.commonPool-worker-1
Message
先看結(jié)論:
- 執(zhí)行 complete 的線程會(huì)執(zhí)行當(dāng)前調(diào)用鏈上的所有CF。
- 如果 CF 提前 complete,后續(xù) CF 由初始線程執(zhí)行。
異步任務(wù)里沒有 sleep 的時(shí)候,異步任務(wù)很快就會(huì)完成,意味著 JVM 執(zhí)行到 thenApply 的時(shí)候,前置 CF 已經(jīng)提前完成所以后續(xù)的 CF 會(huì)被初始線程 main 線程執(zhí)行。
異步任務(wù)里有 sleep 的時(shí)候, JVM 執(zhí)行到 thenApply 時(shí),前置 CF 還沒有完成,前置 CF complete 的線程會(huì)執(zhí)行所有后續(xù)的 CF。
CF 嵌套join
ExecutorService?executorService?=?Executors.newFixedThreadPool(2);
CompletableFuture<Integer>?cf1?=?CompletableFuture.supplyAsync(()?->?{
??Thread.sleep(3000);
??return?1;
},?executorService);
CompletableFuture<Integer>?cf2?=?CompletableFuture.supplyAsync(()?->?{
??Thread.sleep(3000);
??return?2;
},?executorService);
Integer?join1?=?cf1.thenApply((cf1Val)?->?{
??System.out.println("cf1?start?value:"?+?cf1Val);
??Integer?cf2Val?=?cf2.join();
??System.out.println("cf2?end?value:"?+?cf2Val);
??return?3;
}).join();
//output
cf1?start?value:1
cf2?end?value:2
代碼很簡(jiǎn)單,有一個(gè)線程數(shù)為 2 的線程池,cf1、cf2 都使用這個(gè)線程執(zhí)行異步任務(wù),特別的是在 cf1.thenApply 中會(huì)調(diào)用 cf2.join(),當(dāng)線程數(shù)是2的時(shí)候可以順利輸出。
ExecutorService?executorService?=?Executors.newFixedThreadPool(1);
CompletableFuture<Integer>?cf1?=?CompletableFuture.supplyAsync(()?->?{
??Thread.sleep(3000);
??return?1;
},?executorService);
CompletableFuture<Integer>?cf2?=?CompletableFuture.supplyAsync(()?->?{
??Thread.sleep(3000);
??return?2;
},?executorService);
?Integer?join1?=?cf1.thenApply((cf1Val)?->?{
??System.out.println("cf1?start?value:"?+?cf1Val);
??Integer?cf2Val?=?cf2.join();
??System.out.println("cf2?end?value:"?+?cf2Val);
??return?3;
}).join();
//output
cf1?start?value:1
這時(shí)候我們將線程池的線程數(shù)調(diào)整為 1,這時(shí)只會(huì)輸出 cf1 start value:1,然后就一直阻塞。
使用 jstack -l pid 查看線程狀態(tài),發(fā)現(xiàn)是 WAITING,等待的地方正是我們?cè)诖a里調(diào)用的cf2.join():
"pool-1-thread-1"?#11?prio=5?os_prio=31?tid=0x00000001429f5000?nid=0xa903?waiting?on?condition
???java.lang.Thread.State:?WAITING?(parking)
?at?sun.misc.Unsafe.park(Native?Method)
?-?parking?to?wait?for??<0x000000076ba5f7d0>?(a?java.util.concurrent.CompletableFuture$Signaller)
?at?java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
?at?java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
?at?java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
?at?java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
?at?java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
?at?com.ppphuang.demo.threadPool.ExecutorsTest.lambda$main$2(ThreadPoolExecutorsTest.java:34)
原因是我們?cè)谖ㄒ灰粋€(gè)線程中調(diào)用 cf2.join(),阻塞等待 cf2 完成,但是 cf2 需要等待 cf1 完成之后才有空閑線程去執(zhí)行。這就類似于你右手正拿著一個(gè)水杯,然后等待右手拿水壺倒?jié)M水,這是不可能完成的。所以盡量不要嵌套join,不注意隔離線程池的話很容易造成’死鎖‘(線程阻塞)。
CF 常用 API
| API | 描述 |
|---|---|
| supplyAsync | 開啟異步任務(wù),到另一個(gè)線程執(zhí)行,異步任務(wù)有返回值。 |
| complete | 完成任務(wù)。 |
| completeExceptionally | 異常結(jié)束任務(wù)。 |
| thenCombine | 合并任務(wù),兩個(gè)任務(wù)同時(shí)執(zhí)行,結(jié)果由合并函數(shù) BiFunction 返回。 |
| thenApply | 任務(wù)后置處理。 |
| applyToEither | 會(huì)取兩個(gè)任務(wù)最先完成的任務(wù),上個(gè)任務(wù)和這個(gè)任務(wù)同時(shí)進(jìn)行,哪個(gè)先結(jié)束,先用哪個(gè)結(jié)果。 |
| handle | 后續(xù)處理。 |
| whenComplete | 完成后的處理。 |
| allOf | 等待所有異步線程任務(wù)結(jié)束。 |
| join | 獲取返回值,沒有complete的 CF 對(duì)象調(diào)用join時(shí),會(huì)等待complete再返回,已經(jīng) complete的 CF 對(duì)象調(diào)用join時(shí),會(huì)立刻返回結(jié)果。 |
優(yōu)化過程
異步 RPC 客戶端
我們手寫的這個(gè) RPC 框架支持異步調(diào)用,如果你想看具體的實(shí)現(xiàn),可以在文末找到源碼鏈接。異步調(diào)用之前會(huì)設(shè)置一個(gè) CallBack 方法,異步調(diào)用時(shí)會(huì)直接返回 null,不會(huì)等待服務(wù)端返回接果,服務(wù)端返回結(jié)果之后會(huì)通過 RPC 客戶端自帶的線程池執(zhí)行設(shè)置的 CallBack 方法。
RPC 異步調(diào)用圖示:
RPC 異步調(diào)用包裝異步RPC Client
通過 AsyncExecutor 包裝 RPC的客戶端,AsyncExecutor 類中的 client 屬性值為創(chuàng)建的某個(gè) RPC 服務(wù)的異步客戶端代理類,這個(gè)代理類在構(gòu)造方法中創(chuàng)建并賦值給 client 屬性。
類中的 async 方法接受 Function 類型的參數(shù) function,可以通過 function.apply(client) 來通過 client 執(zhí)行真正的 RPC 調(diào)用。
在 ?async 方法中實(shí)例化一個(gè) CompletableFuture, 并將 CompletableFuture 作為異步回調(diào)的上下文設(shè)置到 RPC 的異步回調(diào)中,之后將該 CompletableFuture 返回給調(diào)用者。
public?class?AsyncExecutor<C>?{
????private?C?client;
????public?AsyncExecutor(ClientProxyFactory?clientProxyFactory,?Class<C>?clazz,?String?group,?String?version)?{
????????this.client?=?clientProxyFactory.getProxy(clazz,?group,?version,?true);
????}
????public?<R>?CompletableFuture<R>?async(Function<C,?R>?function)?{
????????CompletableFuture<R>?future?=?new?CompletableFuture<>();
????????ClientProxyFactory.setLocalAsyncContextAndAsyncReceiveHandler(future,?CompletableFutureAsyncCallBack.instance());
????????try?{
????????????function.apply(client);
????????}?catch?(Exception?e)?{
????????????future.completeExceptionally(e);
????????}
????????return?future;
????}
}
異步回調(diào)類
public?class?CompletableFutureAsyncCallBack?extends?AsyncReceiveHandler?{
????private?static?volatile?CompletableFutureAsyncCallBack?INSTANCE;
????private?CompletableFutureAsyncCallBack()?{
????}
????@Override
????public?void?callBack(Object?context,?Object?result)?{
????????if?(!(context?instanceof?CompletableFuture))?{
????????????throw?new?IllegalStateException("the?context?must?be?CompletableFuture");
????????}
????????CompletableFuture?future?=?(CompletableFuture)?context;
????????if?(result?instanceof?Throwable)?{
????????????future.completeExceptionally((Throwable)?result);
????????????return;
????????}
????????log.info("result:{}",?result);
????????future.complete(result);
????}
}
AsyncReceiveHandler 是 RPC 的異步回調(diào)抽象類,類中的 callBack、onException 抽象方法需要子類實(shí)現(xiàn)。
CompletableFutureAsyncCallBack 實(shí)現(xiàn)了這個(gè) callBack 抽象方法,第一個(gè)參數(shù)是我們?cè)诎b異步 RPC Client 時(shí)設(shè)置的 ?CompletableFuture 上下文,第二個(gè)參數(shù)是 RPC 返回的結(jié)果。方法中判斷 ?RPC 返回的結(jié)果是否異常,若異常通過 completeExceptionally 異常結(jié)束這個(gè) CompletableFuture,若正常通過 complete 正常結(jié)束這個(gè) CompletableFuture。
注冊(cè)異步客戶端Bean
@Component
public?class?AsyncExecutorConfig?{
????@Autowired
????ClientProxyFactory?clientProxyFactory;
????@Bean
????public?AsyncExecutor<DemoService>?demoServiceAsyncExecutor()?{
????????return?new?AsyncExecutor<>(clientProxyFactory,?DemoService.class,?"",?"");
????}
}
異步 RPC 調(diào)用
@Autowired
AsyncExecutor<DemoService>?demoServiceAsyncExecutor;
CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"));
String?name?=?pppName.join();
異步HTTP WebClient
WebClient 是從 Spring WebFlux 5.0 版本開始提供的一個(gè)非阻塞的基于響應(yīng)式編程的進(jìn)行 HTTP 請(qǐng)求的客戶端工具。它的響應(yīng)式編程的基于 Reactor 的。
WebClient VS RestTemplate
WebClient的優(yōu)勢(shì)在于:
- 非阻塞響應(yīng)式 IO,單位時(shí)間內(nèi)有限資源下支持更高的并發(fā)量。
- 支持使用 Java8 Lambda 表達(dá)式函數(shù)。
- 支持同步、異步、Stream 流式傳輸。
WebClient 使用
public?CompletableFuture<String>?asyncHttp(String?url)?{
????????WebClient?localhostWebClient?=?WebClient.builder().baseUrl("http://localhost:8080").build();
????????Mono<HttpResult<String>>?userMono?=?localhostWebClient.method(HttpMethod.GET).uri(url)
????????????????.retrieve()
????????????????.bodyToMono(new?ParameterizedTypeReference<HttpResult<String>>()?{})
????????????????//異常處理?有onErrorReturn時(shí)doOnError不會(huì)觸發(fā),所以不需要后續(xù)在CompletableFuture中handle處理異常
????????????????//如果不使用onErrorReturn,建議在后續(xù)CompletableFuture中handle處理異常
????????????????.onErrorReturn(new?HttpResult<>(201,?"default",?"default?hello"))
????????????????//超時(shí)處理
????????????????.timeout(Duration.ofSeconds(3))
????????????????//返回值過濾
????????????????.filter(httpResult?->?httpResult.code?==?200)
????????????????//默認(rèn)值
????????????????.defaultIfEmpty(new?HttpResult<>(201,?"defaultIfEmpty",?"defaultIfEmpty?hello"))
????????????????//失敗重試
????????????????.retryWhen(Retry.backoff(1,?Duration.ofSeconds(1)));
????????CompletableFuture<HttpResult<String>>?stringCompletableFuture?=?WebClientFutureFactory.getCompletableFuture(userMono);
????????return?stringCompletableFuture.thenApply(HttpResult::getData);
????}
WebClient 整合 CF
WebClientFutureFactory.getCompletableFuture 方法會(huì)把 WebClient 返回的結(jié)果組裝成 CompletableFuture ,使用的是 Mono 類的 doOnError 和 subscribe 方法,當(dāng)正常返回時(shí)通過 subscribe 來調(diào)用 completableFuture.complete,當(dāng)異常時(shí)通過 doOnError 來調(diào)用 completableFuture.completeExceptionally:
public?class?WebClientFutureFactory?{
????public?static?<T>?CompletableFuture<T>?getCompletableFuture(Mono<T>?mono)?{
????????CompletableFuture<T>?completableFuture?=?new?CompletableFuture<>();
????????mono.doOnError(throwable?->?{
????????????completableFuture.completeExceptionally(throwable);
????????????log.error("mono.doOnError?throwable:{}",?throwable.getMessage());
????????}).subscribe(result?->?{
????????????completableFuture.complete(result);
????????????log.debug("mono.subscribe?execute?thread:?{}",?Thread.currentThread().getName());
????????});
????????return?completableFuture;
????}
}
WebClient 對(duì)同一服務(wù)的多次調(diào)用:
public?Flux<User>?fetchUsers(List<Integer>?userIds)?{
????return?Flux.fromIterable(userIds)
????????.parallel()
????????.flatMap(this::getUser)
????????.ordered((u1,?u2)?->?u2.id()?-?u1.id());
}
對(duì)返回相同類型的不同服務(wù)進(jìn)行多次調(diào)用:
public?Flux<User>?fetchUserAndOtherUser(int?id)?{
????return?Flux.merge(getUser(id),?getOtherUser(id))
????????.parallel()
????????.runOn(Schedulers.elastic())
????????.ordered((u1,?u2)?->?u2.id()?-?u1.id());
}
對(duì)不同類型的不同服務(wù)的多次調(diào)用:
public?Mono?fetchUserAndItem(int?userId,?int?itemId)?{
????Mono<User>?user?=?getUser(userId).subscribeOn(Schedulers.elastic());
????Mono<Item>?item?=?getItem(itemId).subscribeOn(Schedulers.elastic());
?
????return?Mono.zip(user,?item,?UserWithItem::new);
}
異步數(shù)據(jù)庫(kù)調(diào)用
使用 CompletableFuture.supplyAsync 執(zhí)行異步任務(wù)時(shí),必須指定成自己的線程池,否則 CompletableFuture 會(huì)使用默認(rèn)的線程池 ForkJoinPool,默認(rèn)線程池?cái)?shù)量為 cpus - 1:
WebClient<Boolean>?dbFuture?=?CompletableFuture.supplyAsync(()?->?getDb(id),?ThreadPoolConfig.ASYNC_TASK_EXECUTOR);
編排 CF
構(gòu)造了所有需要異步執(zhí)行的 CompletableFuture 之后,使用 allOf 方法阻塞等待所有的 CompletableFuture 結(jié)果,allOf 響應(yīng)之后可以通過 join 獲取各個(gè) CompletableFuture 的響應(yīng)接口,這里的 join 是會(huì)立刻返回的,不會(huì)阻塞:
//RPC?的?CompletableFuture
CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"));
//RPC?的?CompletableFuture
CompletableFuture<String>?huangName?=?demoServiceAsyncExecutor.async(service?->?service.hello("huang"));
//DB?操作的?CompletableFuture
WebClient<Boolean>?dbFuture?=?CompletableFuture.supplyAsync(()?->?getDb(id),?ThreadPoolConfig.ASYNC_TASK_EXECUTOR);
//allOf?方法阻塞等待所有的?CompletableFuture?結(jié)果?????
return?CompletableFuture.allOf(pppName,?huangName,?dbFuture)
?????//組裝結(jié)果返回
????????.thenApply(r?->?pppName.join()?&&?huangName.join()?&&?dbFuture.join()).join();
超時(shí)處理
java9 中 CompletableFuture 才有超時(shí)處理,使用方法如下:
CompletableFuture.supplyAsync(()?->?6?/?3).orTimeout(1,?TimeUnit.SECONDS);
java8 中需要配合 ScheduledExecutorService + applyToEither:
public?class?TimeoutUtils?{
????private?static?final?ScheduledExecutorService?scheduledExecutor?=?Executors.newScheduledThreadPool(1);
????static?{
????????Runtime.getRuntime().addShutdownHook(new?Thread(scheduledExecutor::shutdownNow));
????}
????public?static?<T>?CompletableFuture<T>?timeout(CompletableFuture<T>?cf,?long?timeout,?TimeUnit?unit)?{
????????CompletableFuture<T>?result?=?new?CompletableFuture<>();
????????scheduledExecutor.schedule(()?->?result.completeExceptionally(new?TimeoutException()),?timeout,?unit);
????????return?cf.applyToEither(result,?Function.identity());
????}
}
TimeoutUtils 類與有一個(gè)靜態(tài)屬性,值為初始化的一個(gè) ScheduledExecutorService ,還有一個(gè)靜態(tài)方法 timeout ,這個(gè)方法將傳入的 cf ?用 ?applyToEither 接口與一個(gè)調(diào)度計(jì)時(shí)的 CompletableFuture 組合,哪個(gè) CompletableFuture 先執(zhí)行完成,就返回哪個(gè)的結(jié)果。
具體使用如下:
CompletableFuture<Integer>?future?=?demoServiceAsyncExecutor.async(service?->?service.getAge(18));
CompletableFuture<Integer>?futureWithTimeout?=?TimeoutUtils.timeout(future,?3,?TimeUnit.SECONDS);
futureWithTimeout.join();
異常與默認(rèn)值處理
CompletableFuture 中可以處理異常有下面三個(gè) API :
public?<U>?CompletableFuture<U>?handle(java.util.function.BiFunction<??super?T,?Throwable,???extends?U>?fn)
handle 接口不論 CompletableFuture 執(zhí)行成功還是異常都會(huì)被處罰,handle 接受一個(gè) BiFunction 參數(shù),BiFunction 中的第一個(gè)參數(shù)為 CompletableFuture 的結(jié)果,另一個(gè)參數(shù)為 CompletableFuture 執(zhí)行過程中的異常,Handle可以返回任意類型的值??梢越o handle 傳入自定義函數(shù),根據(jù)結(jié)果跟執(zhí)行異常返回最終數(shù)據(jù)。
public?CompletableFuture<T>?whenComplete(java.util.function.BiConsumer<??super?T,???super?Throwable>?action)
whenComplete 接口與 handle 類似,whenComplete 接受一個(gè) BiConsumer 參數(shù),BiConsumer 中的第一個(gè)參數(shù)為 CompletableFuture 的結(jié)果,另一個(gè)參數(shù)為 CompletableFuture 執(zhí)行過程中的異常,但是沒有返回值。
public?CompletableFuture<T>?exceptionally(java.util.function.Function<Throwable,???extends?T>?fn)
exceptionally 接口只有在執(zhí)行異常的時(shí)候才會(huì)被觸發(fā),接受一個(gè) Function 參會(huì), Function 只有一個(gè)參數(shù)為 CompletableFuture 執(zhí)行過程中的異常,可以有一個(gè)任意返回值。
下表是三個(gè)接口的對(duì)比:
| handle() | whenComplete() | exceptionly() | |
|---|---|---|---|
| 訪問成功 | Yes | Yes | No |
| 訪問失敗 | Yes | Yes | Yes |
| 能從失敗中恢復(fù) | Yes | No | Yes |
| 能轉(zhuǎn)換結(jié)果從T 到 U | Yes | No | No |
| 成功時(shí)觸發(fā) | Yes | Yes | No |
| 失敗時(shí)觸發(fā) | Yes | Yes | Yes |
| 有異步版本 | Yes | Yes | Yes |
我們使用 handle 接口來處理異常與默認(rèn)值,下面是封裝的一個(gè) handle 接口入?yún)ⅲ?/p>
public?class?DefaultValueHandle<R>?extends?AbstractLogAction<R>?implements?BiFunction<R,?Throwable,?R>?{
???public?DefaultValueHandle(boolean?isNullToDefault,?R?defaultValue,?String?methodName,?Object...?args)?{
????????super(methodName,?args);
????????this.defaultValue?=?defaultValue;
????????this.isNullToDefault?=?isNullToDefault;
????}
???@Override
????public?R?apply(R?result,?Throwable?throwable)?{
????????logResult(result,?throwable);
????????if?(throwable?!=?null)?{
????????????return?defaultValue;
????????}
????????if?(result?==?null?&&?isNullToDefault)?{
????????????return?defaultValue;
????????}
????????return?result;
????}
}
這個(gè)類實(shí)現(xiàn)了 handle 接口需要的 BiFunction 類型,在構(gòu)造方法中有四個(gè)參數(shù) boolean isNullToDefault, R defaultValue, String methodName, Object... args 第一個(gè)參數(shù)是決定執(zhí)行結(jié)果為空值時(shí),是否將我們傳進(jìn)來的第二個(gè)參數(shù)作為默認(rèn)值返回。當(dāng)異常時(shí)也會(huì)將第二個(gè)參數(shù)作為默認(rèn)返回值。最后兩個(gè)參數(shù)一個(gè)是方法名稱,一個(gè)是調(diào)用參數(shù),可以給父類用作日志記錄。
與 CompletableFuture 配合使用如下:
CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"))
????????.handle(new?DefaultValueHandle<>(true,?"name",?"service.hello",?"ppp"));
日志
封裝了一個(gè)實(shí)現(xiàn) BiConsumer 的 LogErrorAction 類,父類有個(gè)抽象類 AbstractLogAction 這個(gè)類就是簡(jiǎn)單使用 logReslut 方法記錄日志,可以自己隨意實(shí)現(xiàn):
public?class?LogErrorAction<R>?extends?AbstractLogAction<R>?implements?BiConsumer<R,?Throwable>{
??@Override
????public?void?accept(R?result,?Throwable?throwable)?{
????????logResult(result,?throwable);
????}
}
與 CompletableFuture 配合使用如下:
CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"))
??.whenComplete(
??new?LogErrorAction<>("hello",?"ppp")
?);
優(yōu)化效果
優(yōu)化前接口平均響應(yīng)耗時(shí) 350ms,優(yōu)化后平均響應(yīng)耗時(shí) 180ms,下降 49% 左右。
最佳實(shí)踐
- 禁止嵌套 join,避免“死鎖”(線程阻塞)。
- 多個(gè) CompletableFuture 聚合時(shí)建議使用 allOf。
- HTTP 使用無阻塞的 Spring webclient,避免自定義線程池線程阻塞。
- 使用 RPC 或者 HTTP 異步調(diào)用生成的 CompletableFuture, 后續(xù)的 thenAppply,handle 等禁止耗時(shí)操作,避免阻塞異步框架線程池。
- 禁止使用 CompletableFuture 的默認(rèn)線程池,不同任務(wù)自定義線程池,不同級(jí)別業(yè)務(wù)線程池隔離,根據(jù)測(cè)試情況設(shè)置線程數(shù),隊(duì)列長(zhǎng)度,拒絕策略。
- 異步執(zhí)行的操作都加上超時(shí),CF 超時(shí)后不會(huì)終止線程中的超時(shí)任務(wù),不設(shè)置超時(shí)可能導(dǎo)致線程長(zhǎng)時(shí)間阻塞。
- 建議使用異常、默認(rèn)值、空值替換、錯(cuò)誤日志等工具記錄信息,方便排查問題。
示例代碼
RPC 框架代碼:https://github.com/PPPHUANG/rpc-spring-starter
異步調(diào)用demo:https://github.com/PPPHUANG/rpc-spring-starter-demo
總結(jié)
- 為什么需要異步調(diào)用
- CompletableFuture 基本使用
- RPC 異步調(diào)用
- HTTP 異步調(diào)用
- 編排 CompletableFuture 提高吞吐量
往期精彩文章:
記一次線上RPC超時(shí)故障排查及后續(xù)GC調(diào)優(yōu)思路
