1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        請(qǐng)求合并的三種方式,大大提高接口性能!

        共 4162字,需瀏覽 9分鐘

         ·

        2021-12-28 01:30

        點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)

        作者:zhenbianshu

        來源:https://zhenbianshu.github.io/


        將相似或重復(fù)請(qǐng)求在上游系統(tǒng)中合并后發(fā)往下游系統(tǒng),可以大大降低下游系統(tǒng)的負(fù)載,提升系統(tǒng)整體吞吐率。
        文章介紹了 hystrix collapser、ConcurrentHashMultiset、自實(shí)現(xiàn)BatchCollapser 三種請(qǐng)求合并技術(shù),并通過其具體實(shí)現(xiàn)對(duì)比各自適用的場景。

        前言

        工作中,我們常見的請(qǐng)求模型都是”請(qǐng)求-應(yīng)答”式,即一次請(qǐng)求中,服務(wù)給請(qǐng)求分配一個(gè)獨(dú)立的線程,一塊獨(dú)立的內(nèi)存空間,所有的操作都是獨(dú)立的,包括資源和系統(tǒng)運(yùn)算。我們也知道,在請(qǐng)求中處理一次系統(tǒng) I/O 的消耗是非常大的,如果有非常多的請(qǐng)求都進(jìn)行同一類 I/O 操作,那么是否可以將這些 I/O 操作都合并到一起,進(jìn)行一次 I/O 操作,是否可以大大降低下游資源服務(wù)器的負(fù)擔(dān)呢?

        最近我工作之余的大部分時(shí)間都花在這個(gè)問題的探究上了,對(duì)比了幾個(gè)現(xiàn)有類庫,為了解決一個(gè)小問題把 hystrix javanica 的代碼翻了一遍,也根據(jù)自己工作中遇到的業(yè)務(wù)需求實(shí)現(xiàn)了一個(gè)簡單的合并類,收獲還是挺大的。可能這個(gè)需求有點(diǎn)”偏門”,在網(wǎng)上搜索結(jié)果并不多,也沒有綜合一點(diǎn)的資料,索性自己總結(jié)分享一下,希望能幫到后來遇到這種問題的小伙伴。

        Hystrix Collapser

        hystrix

        開源的請(qǐng)求合并類庫(知名的)好像也只有 Netflix 公司開源的 Hystrix 了, hystrix 專注于保持 WEB 服務(wù)器在高并發(fā)環(huán)境下的系統(tǒng)穩(wěn)定,我們常用它的熔斷器(Circuit Breaker) 來實(shí)現(xiàn)服務(wù)的服務(wù)隔離和災(zāi)時(shí)降級(jí),有了它,可以使整個(gè)系統(tǒng)不至于被某一個(gè)接口的高并發(fā)洪流沖塌,即使接口掛了也可以將服務(wù)降級(jí),返回一個(gè)人性化的響應(yīng)。請(qǐng)求合并作為一個(gè)保障下游服務(wù)穩(wěn)定的利器,在 hystrix 內(nèi)實(shí)現(xiàn)也并不意外。

        我們?cè)谑褂?hystrix 時(shí),常用它的 javanica 模塊,以注解的方式編寫 hystrix 代碼,使代碼更簡潔而且對(duì)業(yè)務(wù)代碼侵入更低。所以在項(xiàng)目中我們一般至少需要引用 hystrix-core 和 hystrix-javanica 兩個(gè)包。

        另外,hystrix 的實(shí)現(xiàn)都是通過 AOP,我們要還要在項(xiàng)目 xml 里顯式配置 HystrixAspect 的 bean 來啟用它。


        "hystrixAspect"?class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"?/>
        collapser

        hystrix collapser 是 hystrix 內(nèi)的請(qǐng)求合并器,它有自定義 BatchMethod 和 注解兩種實(shí)現(xiàn)方式,自定義 BatchMethod 網(wǎng)上有各種教程,實(shí)現(xiàn)起來很復(fù)雜,需要手寫大量代碼,而注解方式只需要添加兩行注解即可,但配置方式我在官方文檔上也沒找見,中文方面本文應(yīng)該是獨(dú)一份兒了。

        其實(shí)現(xiàn)需要注意的是:

        • 我們?cè)谛枰喜⒌姆椒ㄉ咸砑?@HystrixCollapser 注解,在定義好的合并方法上添加 @HystrixCommand 注解;
        • single 方法只能傳入一個(gè)參數(shù),多參數(shù)情況下需要自己包裝一個(gè)參數(shù)類,而 batch 方法需要 java.util.List;
        • single 方法返回 java.util.concurrent.Future, batch 方法返回 java.util.List,且要保證返回的結(jié)果數(shù)量和傳入的參數(shù)數(shù)量一致。

        下面是一個(gè)簡單的示例:

        public?class?HystrixCollapserSample?{
        ????@HystrixCollapser(batchMethod?=?"batch")
        ????public?Future?single(String?input)?{
        ????????return?null;?//?single方法不會(huì)被執(zhí)行到
        ????}
        ????public?List?batch(List?inputs)?{
        ????????return?inputs.stream().map(it?->?Boolean.TRUE).collect(Collectors.toList());
        ????}
        }
        源碼實(shí)現(xiàn)

        為了解決 hystrix collapser 的配置問題看了下 hystrix javanica 的源碼,這里簡單總結(jié)一下 hystrix 請(qǐng)求合并器的具體實(shí)現(xiàn),源碼的詳細(xì)解析在我的筆記:Hystrix collasper 源碼解析。

        • 在 spring-boot 內(nèi)注冊(cè)切面類的 bean,里面包含 @HystrixCollapser 注解切面;
        • 在方法執(zhí)行時(shí)檢測到方法被 HystrixCollapser 注解后,spring 調(diào)用 methodsAnnotatedWithHystrixCommand方法來執(zhí)行 hystrix 代理;
        • hystrix 獲取一個(gè) collapser 實(shí)例(在當(dāng)前 scope 內(nèi)檢測不到即創(chuàng)建);
        • hystrix 將當(dāng)前請(qǐng)求的參數(shù)提交給 collapser, 由 collapser 存儲(chǔ)在一個(gè) concurrentHashMap (RequestArgumentType -> CollapsedRequest)內(nèi),此方法會(huì)創(chuàng)建一個(gè) Observable 對(duì)象,并返回一個(gè) 觀察此對(duì)象的 Future 給業(yè)務(wù)線程;
        • collpser 在創(chuàng)建時(shí)會(huì)創(chuàng)建一個(gè) timer 線程,定時(shí)消費(fèi)存儲(chǔ)的請(qǐng)求,timer 會(huì)將多個(gè)請(qǐng)求構(gòu)造成一個(gè)合并后的請(qǐng)求,調(diào)用 batch 執(zhí)行后將結(jié)果順序映射到輸出參數(shù),并通知 Future 任務(wù)已完成。

        需要注意,由于需要等待 timer 執(zhí)行真正的請(qǐng)求操作,collapser 會(huì)導(dǎo)致所有的請(qǐng)求的 cost 都會(huì)增加約 timerInterval/2 ms;

        配置

        hystrix collapser 的配置需要在 @HystrixCollapser 注解上使用,主要包括兩個(gè)部分,專有配置和 hystrixCommand 通用配置;

        專有配置包括:

        • collapserKey,這個(gè)可以不用配置,hystrix 會(huì)默認(rèn)使用當(dāng)前方法名;
        • batchMethod,配置 batch 方法名,我們一般會(huì)將 single 方法和 batch 方法定義在同一個(gè)類內(nèi),直接填方法名即可;
        • scope,最坑的配置項(xiàng),也是逼我讀源碼的元兇,com.netflix.hystrix.HystrixCollapser.Scope 枚舉類,有 REQUEST, GLOBAL 兩種選項(xiàng),在 scope 為 REQUEST 時(shí),hystrix 會(huì)為每個(gè)請(qǐng)求都創(chuàng)建一個(gè) collapser, 此時(shí)你會(huì)發(fā)現(xiàn) batch 方法執(zhí)行時(shí),傳入的請(qǐng)求數(shù)總為1。而且 REQUEST 項(xiàng)還是默認(rèn)項(xiàng),不明白這樣請(qǐng)求合并還有什么意義;
        • collapserProperties, 在此選項(xiàng)內(nèi)我們可以配置 hystrixCommand 的通用配置;

        通用配置包括:

        • maxRequestsInBatch, 構(gòu)造批量請(qǐng)求時(shí),使用的單個(gè)請(qǐng)求的最大數(shù)量;
        • timerDelayInMilliseconds, 此選項(xiàng)配置 collapser 的 timer 線程多久會(huì)合并一次請(qǐng)求;
        • requestCache.enabled, 配置提交請(qǐng)求時(shí)是否緩存;

        一個(gè)完整的配置如下:

        @HystrixCollapser(
        ????????????batchMethod?=?"batch",
        ????????????collapserKey?=?"single",
        ????????????scope?=?com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
        ????????????collapserProperties?=?{
        ????????????????????@HystrixProperty(name?=?"maxRequestsInBatch",?value?=?"100"),
        ????????????????????@HystrixProperty(name?=?"timerDelayInMilliseconds",?value?=?"1000"),
        ????????????????????@HystrixProperty(name?=?"requestCache.enabled",?value?=?"true")
        ????????????})

        BatchCollapser

        設(shè)計(jì)

        由于業(yè)務(wù)需求,我們并不太關(guān)心被合并請(qǐng)求的返回值,而且覺得 hystrix 保持那么多的 Future 并沒有必要,于是自己實(shí)現(xiàn)了一個(gè)簡單的請(qǐng)求合并器,業(yè)務(wù)線程簡單地將請(qǐng)求放到一個(gè)容器里,請(qǐng)求數(shù)累積到一定量或延遲了一定的時(shí)間,就取出容器內(nèi)的數(shù)據(jù)統(tǒng)一發(fā)送給下游系統(tǒng)。

        設(shè)計(jì)思想跟 hystrix 類似,合并器有一個(gè)字段作為存儲(chǔ)請(qǐng)求的容器,且設(shè)置一個(gè) timer 線程定時(shí)消費(fèi)容器內(nèi)的請(qǐng)求,業(yè)務(wù)線程將請(qǐng)求參數(shù)提交到合并 器的容器內(nèi)。不同之處在于,業(yè)務(wù)線程將請(qǐng)求提交給容器后立即同步返回成功,不必管請(qǐng)求的消費(fèi)結(jié)果,這樣便實(shí)現(xiàn)了時(shí)間維度上的合并觸發(fā)。

        另外,我還添加了另外一個(gè)維度的觸發(fā)條件,每次將請(qǐng)求參數(shù)添加到容器后都會(huì)檢驗(yàn)一下容器內(nèi)請(qǐng)求的數(shù)量,如果數(shù)量達(dá)到一定的閾值,將在業(yè)務(wù)線程內(nèi)合并執(zhí)行一次。

        由于有兩個(gè)維度會(huì)觸發(fā)合并,就不可避免會(huì)遇到線程安全問題。為了保證容器內(nèi)的請(qǐng)求不會(huì)被多個(gè)線程重復(fù)消費(fèi)或都漏掉,我需要一個(gè)容器能滿足以下條件:

        • 是一種 Collection,類似于 ArrayList 或 Queue,可以存重復(fù)元素且有順序;
        • 在多線程環(huán)境中能安全地將里面的數(shù)據(jù)全取出來進(jìn)行消費(fèi),而不用自己實(shí)現(xiàn)鎖。

        java.util.concurrent 包內(nèi)的 LinkedBlockingDeque 剛好符合要求,首先它實(shí)現(xiàn)了 BlockingDeque 接口,多線程環(huán)境下的存取操作是安全的;此外,它還提供 drainTo(Collection c, int maxElements)方法,可以將容器內(nèi) maxElements 個(gè)元素安全地取出來,放到 Collection c 中。

        實(shí)現(xiàn)

        以下是具體的代碼實(shí)現(xiàn):

        public?class?BatchCollapser<E>?implements?InitializingBean?{
        ?????private?static?final?Logger?logger?=?LoggerFactory.getLogger(BatchCollapser.class);
        ?????private?static?volatile?Map?instance?=?Maps.newConcurrentMap();
        ?????private?static?final?ScheduledExecutorService?SCHEDULE_EXECUTOR?=?Executors.newScheduledThreadPool(1);
        ?????private?volatile?LinkedBlockingDeque?batchContainer?=?new?LinkedBlockingDeque<>();
        ?????private?Handler,?Boolean>?cleaner;
        ?????private?long?interval;
        ?????private?int?threshHold;
        ?????private?BatchCollapser(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)?{
        ?????????this.cleaner?=?cleaner;
        ?????????this.threshHold?=?threshHold;
        ?????????this.interval?=?interval;
        ?????}

        ?????@Override
        ?????public?void?afterPropertiesSet()?throws?Exception?{
        ?????????SCHEDULE_EXECUTOR.scheduleAtFixedRate(()?->?{
        ?????????????try?{
        ?????????????????this.clean();
        ?????????????}?catch?(Exception?e)?{
        ?????????????????logger.error("clean?container?exception",?e);
        ?????????????}
        ?????????},?0,?interval,?TimeUnit.MILLISECONDS);
        ?????}

        ?????public?void?submit(E?event)?{
        ?????????batchContainer.add(event);
        ?????????if?(batchContainer.size()?>=?threshHold)?{
        ?????????????clean();
        ?????????}
        ?????}

        ?????private?void?clean()?{
        ?????????List?transferList?=?Lists.newArrayListWithExpectedSize(threshHold);
        ?????????batchContainer.drainTo(transferList,?100);
        ?????????if?(CollectionUtils.isEmpty(transferList))?{
        ?????????????return;
        ?????????}

        ?????????try?{
        ?????????????cleaner.handle(transferList);
        ?????????}?catch?(Exception?e)?{
        ?????????????logger.error("batch?execute?error,?transferList:{}",?transferList,?e);
        ?????????}
        ?????}

        ?????public?static??BatchCollapser?getInstance(Handler,?Boolean>?cleaner,?int?threshHold,?long?interval)?{
        ?????????Class?jobClass?=?cleaner.getClass();
        ?????????if?(instance.get(jobClass)?==?null)?{
        ?????????????synchronized?(BatchCollapser.class)?{
        ?????????????????if?(instance.get(jobClass)?==?null)?{
        ?????????????????????instance.put(jobClass,?new?BatchCollapser<>(cleaner,?threshHold,?interval));
        ?????????????????}
        ?????????????}
        ?????????}

        ?????????return?instance.get(jobClass);
        ?????}
        ?}

        以下代碼內(nèi)需要注意的點(diǎn):

        • 由于合并器的全局性需求,需要將合并器實(shí)現(xiàn)為一個(gè)單例,另外為了提升它的通用性,內(nèi)部使用使用 concurrentHashMap 和 double check 實(shí)現(xiàn)了一個(gè)簡單的單例工廠。
        • 為了區(qū)分不同用途的合并器,工廠需要傳入一個(gè)實(shí)現(xiàn)了 Handler 的實(shí)例,通過實(shí)例的 class 來對(duì)請(qǐng)求進(jìn)行分組存儲(chǔ)。
        • 由于 java.util.Timer 的阻塞特性,一個(gè) Timer 線程在阻塞時(shí)不會(huì)啟動(dòng)另一個(gè)同樣的 Timer 線程,所以使用 ScheduledExecutorService 定時(shí)啟動(dòng) Timer 線程。

        ConcurrentHashMultiset

        設(shè)計(jì)

        上面介紹的請(qǐng)求合并都是將多個(gè)請(qǐng)求一次發(fā)送,下游服務(wù)器處理時(shí)本質(zhì)上還是多個(gè)請(qǐng)求,最好的請(qǐng)求合并是在內(nèi)存中進(jìn)行,將請(qǐng)求結(jié)果簡單合并成一個(gè)發(fā)送給下游服務(wù)器。如我們經(jīng)常會(huì)遇到的需求:元素分值累加或數(shù)據(jù)統(tǒng)計(jì),就可以先在內(nèi)存中將某一項(xiàng)的分值或數(shù)據(jù)累加起來,定時(shí)請(qǐng)求數(shù)據(jù)庫保存。

        Guava 內(nèi)就提供了這么一種數(shù)據(jù)結(jié)構(gòu):ConcurrentHashMultiset,它不同于普通的 set 結(jié)構(gòu)存儲(chǔ)相同元素時(shí)直接覆蓋原有元素,而是給每個(gè)元素保持一個(gè)計(jì)數(shù) count, 插入重復(fù)時(shí)元素的 count 值加1。而且它在添加和刪除時(shí)并不加鎖也能保證線程安全,具體實(shí)現(xiàn)是通過一個(gè) while(true) 循環(huán)嘗試操作,直到操作夠所需要的數(shù)量。

        ConcurrentHashMultiset 這種排重計(jì)數(shù)的特性,非常適合數(shù)據(jù)統(tǒng)計(jì)這種元素在短時(shí)間內(nèi)重復(fù)率很高的場景,經(jīng)過排重后的數(shù)量計(jì)算,可以大大降低下游服務(wù)器的壓力,即使重復(fù)率不高,能用少量的內(nèi)存空間換取系統(tǒng)可用性的提高,也是很劃算的。

        實(shí)現(xiàn)

        使用 ConcurrentHashMultiset 進(jìn)行請(qǐng)求合并與使用普通容器在整體結(jié)構(gòu)上并無太大差異,具體類似于:

        if?(ConcurrentHashMultiset.isEmpty())?{
        ????return;
        }

        List?transferList?=?Lists.newArrayList();
        ConcurrentHashMultiset.elementSet().forEach(request?->?{
        ????int?count?=?ConcurrentHashMultiset.count(request);
        ????if?(count?<=?0)?{
        ????????return;
        ????}

        ????transferList.add(count?==?1???request?:?new?Request(request.getIncrement()?*?count));
        ????ConcurrentHashMultiset.remove(request,?count);
        });

        小結(jié)

        最后總結(jié)一下各個(gè)技術(shù)適用的場景:

        • hystrix collapser: 需要每個(gè)請(qǐng)求的結(jié)果,并且不在意每個(gè)請(qǐng)求的 cost 會(huì)增加;
        • BatchCollapser: 不在意請(qǐng)求的結(jié)果,需要請(qǐng)求合并能在時(shí)間和數(shù)量兩個(gè)維度上觸發(fā);
        • ConcurrentHashMultiset:請(qǐng)求重復(fù)率很高的統(tǒng)計(jì)類場景;

        另外,如果選擇自己來實(shí)現(xiàn)的話,完全可以將 BatchCollapserConcurrentHashMultiset 結(jié)合一下,在BatchCollapser 里使用 ConcurrentHashMultiset 作為容器,這樣就可以結(jié)合兩者的優(yōu)勢了。


        1、Log4j2維護(hù)者吐槽沒工資還要挨罵,GO安全負(fù)責(zé)人建議開源作者向公司收費(fèi)

        2、太難了!讓程序員崩潰的8個(gè)瞬間

        3、2021年程序員們都在用的神級(jí)數(shù)據(jù)庫

        4、Windows重要功能被閹割,全球用戶怒噴數(shù)月后微軟終于悔改

        5、牛逼!國產(chǎn)開源的遠(yuǎn)程桌面火了,只有9MB 支持自建中繼器!

        6、摔到老三的 Java,未來在哪?

        7、真香!用 IDEA 神器看源碼,效率真高!

        點(diǎn)分享

        點(diǎn)收藏

        點(diǎn)點(diǎn)贊

        點(diǎn)在看

        瀏覽 27
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            黄色一级片国产 | 欧美性大战久久久久 | 美女露出让男生揉的网站韩国 | 欧美一级免费 | 91精品黄色 | 老师让我她我爽了好久动漫 | 成人欧美一区二区三区黑人一 | 性爱视频高清无码 | 日韩美女性生活视频 | jizzjizz中国少妇中文 |