1. Java 異步調(diào)用原理與實(shí)戰(zhàn)經(jīng)驗(yàn)總結(jié)

        共 19084字,需瀏覽 39分鐘

         ·

        2022-08-13 19:28

        本文介紹了線上業(yè)務(wù)中的一些異步調(diào)用實(shí)踐經(jīng)驗(yàn),包含 ?IO 模型介紹、CompletableFuture 的基本使用、RPC 異步調(diào)用、異步 HTTP 客戶端 Spring WebClient 的使用等。RPC 使用前文介紹的手寫 RPC 框架,該框架支持異步調(diào)用。

        本文要點(diǎn):

        • 為什么需要異步調(diào)用
        • CompletableFuture 基本使用
        • RPC 異步調(diào)用
        • HTTP 異步調(diào)用
        • 編排 CompletableFuture 提高吞吐量

        為什么異步

        BIO 模型

        首先我們先回顧一下 BIO 模型:

        aaf682e3da8497e71c1300b7b3c7ac3a.webpBIO 模型

        當(dāng)用戶進(jìn)程調(diào)用了recvfrom 這個(gè)系統(tǒng)調(diào)用,kernel 就開始了 IO 的第一個(gè)階段:準(zhǔn)備數(shù)據(jù)。對(duì)于 network io 來說,很多時(shí)候數(shù)據(jù)在一開始還沒有到達(dá)(比如,還沒有收到一個(gè)完整的UDP包),這個(gè)時(shí)候 kernel 就要等待足夠的數(shù)據(jù)到來。而在用戶進(jìn)程這邊,整個(gè)進(jìn)程會(huì)被阻塞。當(dāng) kernel 一直等到數(shù)據(jù)準(zhǔn)備好了,它就會(huì)將數(shù)據(jù)從 kernel 中拷貝到用戶內(nèi)存,然后 kernel 返回結(jié)果,用戶進(jìn)程才解除 block 的狀態(tài),重新運(yùn)行起來。所以,Blocking IO 的特點(diǎn)就是在 IO 執(zhí)行的兩個(gè)階段都被 block 了。

        同步調(diào)用

        52ed66f01de1f831b09b2adb2abb25aa.webp同步調(diào)用

        在同步調(diào)用的場(chǎng)景下,依次請(qǐng)求多個(gè)接口,耗時(shí)長(zhǎng)、性能差,接口響應(yīng)時(shí)長(zhǎng) T > T1+T2+T3+……+Tn。

        減少同步等待

        一般這個(gè)時(shí)候?yàn)榱藴p少同步等待時(shí)間,會(huì)使用線程池來同時(shí)處理多個(gè)任務(wù),接口的響應(yīng)時(shí)間就是 MAX(T1,T2,T3):

        fc88e4e05b949e03041dace560ad6295.webp線程池異步

        大概代碼如下:

        Future<String>?future?=?executorService.submit(()?->?{
        ??Thread.sleep(2000);
        ??return?"hello?world";
        });
        while?(true)?{
        ??if?(future.isDone())?{
        ????System.out.println(future.get());
        ????break;
        ??}
        }

        同步模型中使用線程池確實(shí)能實(shí)現(xiàn)異步調(diào)用的效果,也能壓縮同步等待的時(shí)間,但是也有一些缺陷:

        • CPU 資源大量浪費(fèi)在阻塞等待上,導(dǎo)致 CPU 資源利用率低。

        • 為了增加并發(fā)度,會(huì)引入更多額外的線程池,隨著 CPU 調(diào)度線程數(shù)的增加,會(huì)導(dǎo)致更嚴(yán)重的資源爭(zhēng)用,上下文切換占用 CPU 資源。

        • 線程池中的線程都是阻塞的,硬件資源無法充分利用,系統(tǒng)吞吐量容易達(dá)到瓶頸。

        NIO 模型

        為了解決 BIO 中的缺陷,引入 NIO 模型:

        219112e7e90e340325ec24d896eab385.webpNIO 模型

        當(dāng)用戶進(jìn)程發(fā)出 read 操作時(shí),如果 kernel 中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會(huì) block 用戶進(jìn)程,而是立刻返回一個(gè) error。從用戶進(jìn)程角度講 ,它發(fā)起一個(gè) read 操作后,并不需要等待,而是馬上就得到了一個(gè)結(jié)果。用戶進(jìn)程判斷結(jié)果是一個(gè) error 時(shí),它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是它可以再次發(fā)送 read 操作。一旦 kernel 中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進(jìn)程的 system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。所以,用戶進(jìn)程其實(shí)是需要不斷的主動(dòng)詢問 kernel 數(shù)據(jù)好了沒有。

        異步優(yōu)化思路

        我們知道了 NIO 的調(diào)用方式比 BIO 好,那我們?cè)趺茨茉跇I(yè)務(wù)編碼中使用到 NIO 呢?自己動(dòng)手將 BIO 替換成 NIO 肯定不現(xiàn)實(shí),已有組件支持 NIO 的可以直接使用,不支持的繼續(xù)使用自定義線程池。

        • 通過 RPC NIO 異步調(diào)用、 HTTP 異步調(diào)用的方式降低線程數(shù),從而降低調(diào)度(上下文切換)開銷。
        • 沒有原生支持 NIO 異步調(diào)用的繼續(xù)使用線程池。
        • 引入 CompletableFuture 對(duì)業(yè)務(wù)流程進(jìn)行編排,降低依賴之間的阻塞。

        簡(jiǎn)述CompletableFuture

        CompletableFuture 是 java.util.concurrent 庫(kù)在 java 8 中新增的主要工具,同傳統(tǒng)的 Future 相比,其支持流式計(jì)算、函數(shù)式編程、完成通知、自定義異常處理等很多新的特性。

        常用 API 舉例

        supplyAsync
        CompletableFuture<String>?future?=?CompletableFuture.supplyAsync(()->{????
        ??try{
        ????Thread.sleep(1000L);
        ????return?"hello?world";
        ??}?catch?(Exception?e){
        ????return?"failed";
        ??}
        });
        System.out.println(future.join());
        //?output
        hello?world

        開啟異步任務(wù),到另一個(gè)線程執(zhí)行。

        complete
        CompletableFuture<String>?future1?=?new?CompletableFuture<>();
        future.complete("hello?world");?????//異步線程執(zhí)行
        future.whenComplete((res,?throwable)?->?{
        ??System.out.println(res);
        });
        System.out.println(future1.join());
        CompletableFuture<String>?future2?=?new?CompletableFuture<>();
        future.completeExceptionally(new?Throwable("failed"));?//異步線程執(zhí)行
        System.out.println(future2.join());
        //?output
        hello?world
        hello?world
        ??
        Exception?in?thread?"main"?
        java.util.concurrent.CompletionException:?
        java.lang.Throwable:?failed

        complete 正常完成該 CompletableFuture。

        completeExceptionally 異常完成該 CompletableFuture。

        thenApply
        String?original?=?"Message";
        CompletableFuture<String>?cf?=?
        ?CompletableFuture.completedFuture(original).thenApply(String::toUpperCase);
        System.out.println(cf.join());
        //?output
        MESSAGE

        任務(wù)后置處理。

        圖示:

        b10aac49a5f907caf7e8d501bafe6714.webpthenApply 圖示
        thenCombine
        CompletableFuture<String>?cf?=?
        ?CompletableFuture.completedFuture("Message").thenApply(String::toUpperCase);
        CompletableFuture<String>?cf1?=?
        ?CompletableFuture.completedFuture("Message").thenApply(String::toLowerCase);
        CompletableFuture<String>?allCf?=?cf.thenCombine(cf1,?(s1,?s2)?->?s1?+?s2);
        System.out.println(allCf.join());
        //?output
        MSGmsg

        合并任務(wù),兩個(gè)任務(wù)同時(shí)執(zhí)行,結(jié)果由合并函數(shù) BiFunction 返回。

        圖示:

        7960c684fb9c0fd7c933bc2e6f9029b0.webpthenCombine 圖示
        allOf
        CompletableFuture<String>?future1?=?CompletableFuture.supplyAsync(()?->?"Message1");
        CompletableFuture<String>?future2?=?CompletableFuture.supplyAsync(()?->?"Message2");
        CompletableFuture<String>?future3?=?CompletableFuture.supplyAsync(()?->?"Message3");
        CompletableFuture<String>?future?=?
        ?CompletableFuture.allOf(future1,?future2,?future3).thenApply(v?->?{
        ??String?join1?=?future1.join();
        ??String?join2?=?future2.join();
        ??String?join3?=?future3.join();
        ??return?join1?+?join2?+?join3;});
        System.out.println(future.join());
        //?output
        Msg1Msg2Msg3

        allOf 會(huì)阻塞等待所有異步線程任務(wù)結(jié)束。

        allOf 里的 join 并不會(huì)阻塞,傳給 thenApply 的函數(shù)是在 future1, future2, future3 全部完成時(shí),才會(huì)執(zhí)行 。

        圖示:

        ad5a81bd2d6d1b380ef65cb34411a1d7.webpallOf 圖示

        CF 執(zhí)行線程

        下面有兩個(gè)小demo,可以先試著想想輸出的結(jié)果:

        String?original?=?"Message";
        CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{
        ??System.out.println("supplyAsync?thread:?"?+?Thread.currentThread().getName());
        ??return?original;
        }).thenApply(r?->?{
        ??System.out.println("thenApply?thread:?"?+?Thread.currentThread().getName());
        ??return?r;
        });
        System.out.println(cf.join());
        //?output
        supplyAsync?thread:?ForkJoinPool.commonPool-worker-1
        thenApply?thread:?main
        Message
        String?original?=?"Message";
        CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{
        ??System.out.println("supplyAsync?thread:?"?+?Thread.currentThread().getName());
        ??try?{
        ????Thread.sleep(100);
        ??}?catch?(InterruptedException?e)?{
        ????throw?new?RuntimeException(e);
        ??}
        ??return?original;
        }).thenApply(r?->?{
        ??System.out.println("thenApply?thread:?"?+?Thread.currentThread().getName());
        ??return?r;
        });
        System.out.println(cf.join());
        //?output
        supplyAsync?thread:?ForkJoinPool.commonPool-worker-1
        thenApply?thread:?ForkJoinPool.commonPool-worker-1
        Message

        先看結(jié)論:

        • 執(zhí)行 complete 的線程會(huì)執(zhí)行當(dāng)前調(diào)用鏈上的所有CF。
        • 如果 CF 提前 complete,后續(xù) CF 由初始線程執(zhí)行。

        異步任務(wù)里沒有 sleep 的時(shí)候,異步任務(wù)很快就會(huì)完成,意味著 JVM 執(zhí)行到 thenApply 的時(shí)候,前置 CF 已經(jīng)提前完成所以后續(xù)的 CF 會(huì)被初始線程 main 線程執(zhí)行。

        異步任務(wù)里有 sleep 的時(shí)候, JVM 執(zhí)行到 thenApply 時(shí),前置 CF 還沒有完成,前置 CF complete 的線程會(huì)執(zhí)行所有后續(xù)的 CF。

        CF 嵌套join

        ExecutorService?executorService?=?Executors.newFixedThreadPool(2);
        CompletableFuture<Integer>?cf1?=?CompletableFuture.supplyAsync(()?->?{
        ??Thread.sleep(3000);
        ??return?1;
        },?executorService);
        CompletableFuture<Integer>?cf2?=?CompletableFuture.supplyAsync(()?->?{
        ??Thread.sleep(3000);
        ??return?2;
        },?executorService);
        Integer?join1?=?cf1.thenApply((cf1Val)?->?{
        ??System.out.println("cf1?start?value:"?+?cf1Val);
        ??Integer?cf2Val?=?cf2.join();
        ??System.out.println("cf2?end?value:"?+?cf2Val);
        ??return?3;
        }).join();
        //output
        cf1?start?value:1
        cf2?end?value:2

        代碼很簡(jiǎn)單,有一個(gè)線程數(shù)為 2 的線程池,cf1、cf2 都使用這個(gè)線程執(zhí)行異步任務(wù),特別的是在 cf1.thenApply 中會(huì)調(diào)用 cf2.join(),當(dāng)線程數(shù)是2的時(shí)候可以順利輸出。

        ExecutorService?executorService?=?Executors.newFixedThreadPool(1);
        CompletableFuture<Integer>?cf1?=?CompletableFuture.supplyAsync(()?->?{
        ??Thread.sleep(3000);
        ??return?1;
        },?executorService);
        CompletableFuture<Integer>?cf2?=?CompletableFuture.supplyAsync(()?->?{
        ??Thread.sleep(3000);
        ??return?2;
        },?executorService);
        ?Integer?join1?=?cf1.thenApply((cf1Val)?->?{
        ??System.out.println("cf1?start?value:"?+?cf1Val);
        ??Integer?cf2Val?=?cf2.join();
        ??System.out.println("cf2?end?value:"?+?cf2Val);
        ??return?3;
        }).join();
        //output
        cf1?start?value:1

        這時(shí)候我們將線程池的線程數(shù)調(diào)整為 1,這時(shí)只會(huì)輸出 cf1 start value:1,然后就一直阻塞。

        使用 jstack -l pid 查看線程狀態(tài),發(fā)現(xiàn)是 WAITING,等待的地方正是我們?cè)诖a里調(diào)用的cf2.join()

        "pool-1-thread-1"?#11?prio=5?os_prio=31?tid=0x00000001429f5000?nid=0xa903?waiting?on?condition
        ???java.lang.Thread.State:?WAITING?(parking)
        ?at?sun.misc.Unsafe.park(Native?Method)
        ?-?parking?to?wait?for??<0x000000076ba5f7d0>?(a?java.util.concurrent.CompletableFuture$Signaller)
        ?at?java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        ?at?java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
        ?at?java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
        ?at?java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
        ?at?java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
        ?at?com.ppphuang.demo.threadPool.ExecutorsTest.lambda$main$2(ThreadPoolExecutorsTest.java:34)

        原因是我們?cè)谖ㄒ灰粋€(gè)線程中調(diào)用 cf2.join(),阻塞等待 cf2 完成,但是 cf2 需要等待 cf1 完成之后才有空閑線程去執(zhí)行。這就類似于你右手正拿著一個(gè)水杯,然后等待右手拿水壺倒?jié)M水,這是不可能完成的。所以盡量不要嵌套join,不注意隔離線程池的話很容易造成’死鎖‘(線程阻塞)。

        CF 常用 API

        API描述
        supplyAsync開啟異步任務(wù),到另一個(gè)線程執(zhí)行,異步任務(wù)有返回值。
        complete完成任務(wù)。
        completeExceptionally異常結(jié)束任務(wù)。
        thenCombine合并任務(wù),兩個(gè)任務(wù)同時(shí)執(zhí)行,結(jié)果由合并函數(shù) BiFunction 返回。
        thenApply任務(wù)后置處理。
        applyToEither會(huì)取兩個(gè)任務(wù)最先完成的任務(wù),上個(gè)任務(wù)和這個(gè)任務(wù)同時(shí)進(jìn)行,哪個(gè)先結(jié)束,先用哪個(gè)結(jié)果。
        handle后續(xù)處理。
        whenComplete完成后的處理。
        allOf等待所有異步線程任務(wù)結(jié)束。
        join獲取返回值,沒有complete的 CF 對(duì)象調(diào)用join時(shí),會(huì)等待complete再返回,已經(jīng) complete的 CF 對(duì)象調(diào)用join時(shí),會(huì)立刻返回結(jié)果。

        優(yōu)化過程

        異步 RPC 客戶端

        我們手寫的這個(gè) RPC 框架支持異步調(diào)用,如果你想看具體的實(shí)現(xiàn),可以在文末找到源碼鏈接。異步調(diào)用之前會(huì)設(shè)置一個(gè) CallBack 方法,異步調(diào)用時(shí)會(huì)直接返回 null,不會(huì)等待服務(wù)端返回接果,服務(wù)端返回結(jié)果之后會(huì)通過 RPC 客戶端自帶的線程池執(zhí)行設(shè)置的 CallBack 方法。

        RPC 異步調(diào)用圖示:

        8ec1d2451c71fadb53441fa1840a7c24.webpRPC 異步調(diào)用
        包裝異步RPC Client

        通過 AsyncExecutor 包裝 RPC的客戶端,AsyncExecutor 類中的 client 屬性值為創(chuàng)建的某個(gè) RPC 服務(wù)的異步客戶端代理類,這個(gè)代理類在構(gòu)造方法中創(chuàng)建并賦值給 client 屬性。

        類中的 async 方法接受 Function 類型的參數(shù) function,可以通過 function.apply(client) 來通過 client 執(zhí)行真正的 RPC 調(diào)用。

        在 ?async 方法中實(shí)例化一個(gè) CompletableFuture, 并將 CompletableFuture 作為異步回調(diào)的上下文設(shè)置到 RPC 的異步回調(diào)中,之后將該 CompletableFuture 返回給調(diào)用者。

        public?class?AsyncExecutor<C>?{

        ????private?C?client;

        ????public?AsyncExecutor(ClientProxyFactory?clientProxyFactory,?Class<C>?clazz,?String?group,?String?version)?{
        ????????this.client?=?clientProxyFactory.getProxy(clazz,?group,?version,?true);
        ????}

        ????public?<R>?CompletableFuture<R>?async(Function<C,?R>?function)?{
        ????????CompletableFuture<R>?future?=?new?CompletableFuture<>();
        ????????ClientProxyFactory.setLocalAsyncContextAndAsyncReceiveHandler(future,?CompletableFutureAsyncCallBack.instance());
        ????????try?{
        ????????????function.apply(client);
        ????????}?catch?(Exception?e)?{
        ????????????future.completeExceptionally(e);
        ????????}
        ????????return?future;
        ????}
        }
        異步回調(diào)類
        public?class?CompletableFutureAsyncCallBack?extends?AsyncReceiveHandler?{
        ????private?static?volatile?CompletableFutureAsyncCallBack?INSTANCE;

        ????private?CompletableFutureAsyncCallBack()?{
        ????}

        ????@Override
        ????public?void?callBack(Object?context,?Object?result)?{
        ????????if?(!(context?instanceof?CompletableFuture))?{
        ????????????throw?new?IllegalStateException("the?context?must?be?CompletableFuture");
        ????????}
        ????????CompletableFuture?future?=?(CompletableFuture)?context;
        ????????if?(result?instanceof?Throwable)?{
        ????????????future.completeExceptionally((Throwable)?result);
        ????????????return;
        ????????}
        ????????log.info("result:{}",?result);
        ????????future.complete(result);
        ????}
        }

        AsyncReceiveHandler 是 RPC 的異步回調(diào)抽象類,類中的 callBack、onException 抽象方法需要子類實(shí)現(xiàn)。

        CompletableFutureAsyncCallBack 實(shí)現(xiàn)了這個(gè) callBack 抽象方法,第一個(gè)參數(shù)是我們?cè)诎b異步 RPC Client 時(shí)設(shè)置的 ?CompletableFuture 上下文,第二個(gè)參數(shù)是 RPC 返回的結(jié)果。方法中判斷 ?RPC 返回的結(jié)果是否異常,若異常通過 completeExceptionally 異常結(jié)束這個(gè) CompletableFuture,若正常通過 complete 正常結(jié)束這個(gè) CompletableFuture。

        注冊(cè)異步客戶端Bean
        @Component
        public?class?AsyncExecutorConfig?{
        ????@Autowired
        ????ClientProxyFactory?clientProxyFactory;

        ????@Bean
        ????public?AsyncExecutor<DemoService>?demoServiceAsyncExecutor()?{
        ????????return?new?AsyncExecutor<>(clientProxyFactory,?DemoService.class,?"",?"");
        ????}
        }
        異步 RPC 調(diào)用
        @Autowired
        AsyncExecutor<DemoService>?demoServiceAsyncExecutor;

        CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"));

        String?name?=?pppName.join();

        異步HTTP WebClient

        WebClient 是從 Spring WebFlux 5.0 版本開始提供的一個(gè)非阻塞的基于響應(yīng)式編程的進(jìn)行 HTTP 請(qǐng)求的客戶端工具。它的響應(yīng)式編程的基于 Reactor 的。

        WebClient VS RestTemplate

        WebClient的優(yōu)勢(shì)在于:

        • 非阻塞響應(yīng)式 IO,單位時(shí)間內(nèi)有限資源下支持更高的并發(fā)量。
        • 支持使用 Java8 Lambda 表達(dá)式函數(shù)。
        • 支持同步、異步、Stream 流式傳輸。
        WebClient 使用
        public?CompletableFuture<String>?asyncHttp(String?url)?{
        ????????WebClient?localhostWebClient?=?WebClient.builder().baseUrl("http://localhost:8080").build();
        ????????Mono<HttpResult<String>>?userMono?=?localhostWebClient.method(HttpMethod.GET).uri(url)
        ????????????????.retrieve()
        ????????????????.bodyToMono(new?ParameterizedTypeReference<HttpResult<String>>()?{})
        ????????????????//異常處理?有onErrorReturn時(shí)doOnError不會(huì)觸發(fā),所以不需要后續(xù)在CompletableFuture中handle處理異常
        ????????????????//如果不使用onErrorReturn,建議在后續(xù)CompletableFuture中handle處理異常
        ????????????????.onErrorReturn(new?HttpResult<>(201,?"default",?"default?hello"))
        ????????????????//超時(shí)處理
        ????????????????.timeout(Duration.ofSeconds(3))
        ????????????????//返回值過濾
        ????????????????.filter(httpResult?->?httpResult.code?==?200)
        ????????????????//默認(rèn)值
        ????????????????.defaultIfEmpty(new?HttpResult<>(201,?"defaultIfEmpty",?"defaultIfEmpty?hello"))
        ????????????????//失敗重試
        ????????????????.retryWhen(Retry.backoff(1,?Duration.ofSeconds(1)));
        ????????CompletableFuture<HttpResult<String>>?stringCompletableFuture?=?WebClientFutureFactory.getCompletableFuture(userMono);
        ????????return?stringCompletableFuture.thenApply(HttpResult::getData);
        ????}
        WebClient 整合 CF

        WebClientFutureFactory.getCompletableFuture 方法會(huì)把 WebClient 返回的結(jié)果組裝成 CompletableFuture ,使用的是 Mono 類的 doOnError 和 subscribe 方法,當(dāng)正常返回時(shí)通過 subscribe 來調(diào)用 completableFuture.complete,當(dāng)異常時(shí)通過 doOnError 來調(diào)用 completableFuture.completeExceptionally:

        public?class?WebClientFutureFactory?{
        ????public?static?<T>?CompletableFuture<T>?getCompletableFuture(Mono<T>?mono)?{
        ????????CompletableFuture<T>?completableFuture?=?new?CompletableFuture<>();
        ????????mono.doOnError(throwable?->?{
        ????????????completableFuture.completeExceptionally(throwable);
        ????????????log.error("mono.doOnError?throwable:{}",?throwable.getMessage());
        ????????}).subscribe(result?->?{
        ????????????completableFuture.complete(result);
        ????????????log.debug("mono.subscribe?execute?thread:?{}",?Thread.currentThread().getName());
        ????????});
        ????????return?completableFuture;
        ????}
        }

        WebClient 對(duì)同一服務(wù)的多次調(diào)用:

        public?Flux<User>?fetchUsers(List<Integer>?userIds)?{
        ????return?Flux.fromIterable(userIds)
        ????????.parallel()
        ????????.flatMap(this::getUser)
        ????????.ordered((u1,?u2)?->?u2.id()?-?u1.id());
        }

        對(duì)返回相同類型的不同服務(wù)進(jìn)行多次調(diào)用:

        public?Flux<User>?fetchUserAndOtherUser(int?id)?{
        ????return?Flux.merge(getUser(id),?getOtherUser(id))
        ????????.parallel()
        ????????.runOn(Schedulers.elastic())
        ????????.ordered((u1,?u2)?->?u2.id()?-?u1.id());
        }

        對(duì)不同類型的不同服務(wù)的多次調(diào)用:

        public?Mono?fetchUserAndItem(int?userId,?int?itemId)?{
        ????Mono<User>?user?=?getUser(userId).subscribeOn(Schedulers.elastic());
        ????Mono<Item>?item?=?getItem(itemId).subscribeOn(Schedulers.elastic());
        ?
        ????return?Mono.zip(user,?item,?UserWithItem::new);
        }

        異步數(shù)據(jù)庫(kù)調(diào)用

        使用 CompletableFuture.supplyAsync 執(zhí)行異步任務(wù)時(shí),必須指定成自己的線程池,否則 CompletableFuture 會(huì)使用默認(rèn)的線程池 ForkJoinPool,默認(rèn)線程池?cái)?shù)量為 cpus - 1:

        WebClient<Boolean>?dbFuture?=?CompletableFuture.supplyAsync(()?->?getDb(id),?ThreadPoolConfig.ASYNC_TASK_EXECUTOR);

        編排 CF

        構(gòu)造了所有需要異步執(zhí)行的 CompletableFuture 之后,使用 allOf 方法阻塞等待所有的 CompletableFuture 結(jié)果,allOf 響應(yīng)之后可以通過 join 獲取各個(gè) CompletableFuture 的響應(yīng)接口,這里的 join 是會(huì)立刻返回的,不會(huì)阻塞:

        //RPC?的?CompletableFuture
        CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"));

        //RPC?的?CompletableFuture
        CompletableFuture<String>?huangName?=?demoServiceAsyncExecutor.async(service?->?service.hello("huang"));

        //DB?操作的?CompletableFuture
        WebClient<Boolean>?dbFuture?=?CompletableFuture.supplyAsync(()?->?getDb(id),?ThreadPoolConfig.ASYNC_TASK_EXECUTOR);

        //allOf?方法阻塞等待所有的?CompletableFuture?結(jié)果?????
        return?CompletableFuture.allOf(pppName,?huangName,?dbFuture)
        ?????//組裝結(jié)果返回
        ????????.thenApply(r?->?pppName.join()?&&?huangName.join()?&&?dbFuture.join()).join();

        超時(shí)處理

        java9 中 CompletableFuture 才有超時(shí)處理,使用方法如下:

        CompletableFuture.supplyAsync(()?->?6?/?3).orTimeout(1,?TimeUnit.SECONDS);

        java8 中需要配合 ScheduledExecutorService + applyToEither:

        public?class?TimeoutUtils?{
        ????private?static?final?ScheduledExecutorService?scheduledExecutor?=?Executors.newScheduledThreadPool(1);

        ????static?{
        ????????Runtime.getRuntime().addShutdownHook(new?Thread(scheduledExecutor::shutdownNow));
        ????}

        ????public?static?<T>?CompletableFuture<T>?timeout(CompletableFuture<T>?cf,?long?timeout,?TimeUnit?unit)?{
        ????????CompletableFuture<T>?result?=?new?CompletableFuture<>();
        ????????scheduledExecutor.schedule(()?->?result.completeExceptionally(new?TimeoutException()),?timeout,?unit);
        ????????return?cf.applyToEither(result,?Function.identity());
        ????}
        }

        TimeoutUtils 類與有一個(gè)靜態(tài)屬性,值為初始化的一個(gè) ScheduledExecutorService ,還有一個(gè)靜態(tài)方法 timeout ,這個(gè)方法將傳入的 cf ?用 ?applyToEither 接口與一個(gè)調(diào)度計(jì)時(shí)的 CompletableFuture 組合,哪個(gè) CompletableFuture 先執(zhí)行完成,就返回哪個(gè)的結(jié)果。

        具體使用如下:

        CompletableFuture<Integer>?future?=?demoServiceAsyncExecutor.async(service?->?service.getAge(18));
        CompletableFuture<Integer>?futureWithTimeout?=?TimeoutUtils.timeout(future,?3,?TimeUnit.SECONDS);
        futureWithTimeout.join();

        異常與默認(rèn)值處理

        CompletableFuture 中可以處理異常有下面三個(gè) API :

        public?<U>?CompletableFuture<U>?handle(java.util.function.BiFunction<??super?T,?Throwable,???extends?U>?fn)

        handle 接口不論 CompletableFuture 執(zhí)行成功還是異常都會(huì)被處罰,handle 接受一個(gè) BiFunction 參數(shù),BiFunction 中的第一個(gè)參數(shù)為 CompletableFuture 的結(jié)果,另一個(gè)參數(shù)為 CompletableFuture 執(zhí)行過程中的異常,Handle可以返回任意類型的值??梢越o handle 傳入自定義函數(shù),根據(jù)結(jié)果跟執(zhí)行異常返回最終數(shù)據(jù)。

        public?CompletableFuture<T>?whenComplete(java.util.function.BiConsumer<??super?T,???super?Throwable>?action)

        whenComplete 接口與 handle 類似,whenComplete 接受一個(gè) BiConsumer 參數(shù),BiConsumer 中的第一個(gè)參數(shù)為 CompletableFuture 的結(jié)果,另一個(gè)參數(shù)為 CompletableFuture 執(zhí)行過程中的異常,但是沒有返回值。

        public?CompletableFuture<T>?exceptionally(java.util.function.Function<Throwable,???extends?T>?fn)

        exceptionally 接口只有在執(zhí)行異常的時(shí)候才會(huì)被觸發(fā),接受一個(gè) Function 參會(huì), Function 只有一個(gè)參數(shù)為 CompletableFuture 執(zhí)行過程中的異常,可以有一個(gè)任意返回值。

        下表是三個(gè)接口的對(duì)比:


        handle()whenComplete()exceptionly()
        訪問成功YesYesNo
        訪問失敗YesYesYes
        能從失敗中恢復(fù)YesNoYes
        能轉(zhuǎn)換結(jié)果從T 到 UYesNoNo
        成功時(shí)觸發(fā)YesYesNo
        失敗時(shí)觸發(fā)YesYesYes
        有異步版本YesYesYes

        我們使用 handle 接口來處理異常與默認(rèn)值,下面是封裝的一個(gè) handle 接口入?yún)ⅲ?/p>

        public?class?DefaultValueHandle<R>?extends?AbstractLogAction<R>?implements?BiFunction<R,?Throwable,?R>?{
        ???public?DefaultValueHandle(boolean?isNullToDefault,?R?defaultValue,?String?methodName,?Object...?args)?{
        ????????super(methodName,?args);
        ????????this.defaultValue?=?defaultValue;
        ????????this.isNullToDefault?=?isNullToDefault;
        ????}
        ???@Override
        ????public?R?apply(R?result,?Throwable?throwable)?{
        ????????logResult(result,?throwable);
        ????????if?(throwable?!=?null)?{
        ????????????return?defaultValue;
        ????????}
        ????????if?(result?==?null?&&?isNullToDefault)?{
        ????????????return?defaultValue;
        ????????}
        ????????return?result;
        ????}
        }

        這個(gè)類實(shí)現(xiàn)了 handle 接口需要的 BiFunction 類型,在構(gòu)造方法中有四個(gè)參數(shù) boolean isNullToDefault, R defaultValue, String methodName, Object... args 第一個(gè)參數(shù)是決定執(zhí)行結(jié)果為空值時(shí),是否將我們傳進(jìn)來的第二個(gè)參數(shù)作為默認(rèn)值返回。當(dāng)異常時(shí)也會(huì)將第二個(gè)參數(shù)作為默認(rèn)返回值。最后兩個(gè)參數(shù)一個(gè)是方法名稱,一個(gè)是調(diào)用參數(shù),可以給父類用作日志記錄。

        與 CompletableFuture 配合使用如下:

        CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"))
        ????????.handle(new?DefaultValueHandle<>(true,?"name",?"service.hello",?"ppp"));

        日志

        封裝了一個(gè)實(shí)現(xiàn) BiConsumer 的 LogErrorAction 類,父類有個(gè)抽象類 AbstractLogAction 這個(gè)類就是簡(jiǎn)單使用 logReslut 方法記錄日志,可以自己隨意實(shí)現(xiàn):

        public?class?LogErrorAction<R>?extends?AbstractLogAction<R>?implements?BiConsumer<R,?Throwable>{
        ??@Override
        ????public?void?accept(R?result,?Throwable?throwable)?{
        ????????logResult(result,?throwable);
        ????}
        }

        與 CompletableFuture 配合使用如下:

        CompletableFuture<String>?pppName?=?demoServiceAsyncExecutor.async(service?->?service.hello("ppp"))
        ??.whenComplete(
        ??new?LogErrorAction<>("hello",?"ppp")
        ?);

        優(yōu)化效果

        優(yōu)化前接口平均響應(yīng)耗時(shí) 350ms,優(yōu)化后平均響應(yīng)耗時(shí) 180ms,下降 49% 左右。

        最佳實(shí)踐

        • 禁止嵌套 join,避免“死鎖”(線程阻塞)。
        • 多個(gè) CompletableFuture 聚合時(shí)建議使用 allOf。
        • HTTP 使用無阻塞的 Spring webclient,避免自定義線程池線程阻塞。
        • 使用 RPC 或者 HTTP 異步調(diào)用生成的 CompletableFuture, 后續(xù)的 thenAppply,handle 等禁止耗時(shí)操作,避免阻塞異步框架線程池。
        • 禁止使用 CompletableFuture 的默認(rèn)線程池,不同任務(wù)自定義線程池,不同級(jí)別業(yè)務(wù)線程池隔離,根據(jù)測(cè)試情況設(shè)置線程數(shù),隊(duì)列長(zhǎng)度,拒絕策略。
        • 異步執(zhí)行的操作都加上超時(shí),CF 超時(shí)后不會(huì)終止線程中的超時(shí)任務(wù),不設(shè)置超時(shí)可能導(dǎo)致線程長(zhǎng)時(shí)間阻塞。
        • 建議使用異常、默認(rèn)值、空值替換、錯(cuò)誤日志等工具記錄信息,方便排查問題。

        示例代碼

        RPC 框架代碼:https://github.com/PPPHUANG/rpc-spring-starter

        異步調(diào)用demo:https://github.com/PPPHUANG/rpc-spring-starter-demo

        總結(jié)

        • 為什么需要異步調(diào)用
        • CompletableFuture 基本使用
        • RPC 異步調(diào)用
        • HTTP 異步調(diào)用
        • 編排 CompletableFuture 提高吞吐量

        往期精彩文章:


        如何使用注解優(yōu)雅的記錄操作日志


        你買的云服務(wù)器,可能正泡在水里。


        一次完整的JVM堆外內(nèi)存泄漏故障排查記錄


        記一次線上RPC超時(shí)故障排查及后續(xù)GC調(diào)優(yōu)思路


        管理訂單狀態(tài),該用上狀態(tài)機(jī)嗎?




        瀏覽 136
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 国产禁止18无遮挡10000部 | 女人脱下内裤让男人捅 | 中文字幕成人在线播放 | 亚洲97人人视 | 特級西西444WWw高清大膽 |