Gobrs-Async高性能異步編排框架
Gobrs-Async 是一款功能強(qiáng)大、配置靈活、帶有全鏈路異常回調(diào)、內(nèi)存優(yōu)化、異常狀態(tài)管理于一身的高性能異步編排框架。為企業(yè)提供在復(fù)雜應(yīng)用場景下動態(tài)任務(wù)編排的能力。 針對于復(fù)雜場景下,異步線程復(fù)雜性、任務(wù)依賴性、異常狀態(tài)難控制性; Gobrs-Async 為此而生。
資源索引
能解決什么問題
能解決 CompletableFuture 所不能解決的問題。 怎么理解呢?
傳統(tǒng)的Future、CompleteableFuture一定程度上可以完成任務(wù)編排,并可以把結(jié)果傳遞到下一個任務(wù)。如CompletableFuture有then方法,但是卻無法做到對每一個執(zhí)行單元的回調(diào)。譬如A執(zhí)行完畢成功了,后面是B,我希望A在執(zhí)行完后就有個回調(diào)結(jié)果,方便我監(jiān)控當(dāng)前的執(zhí)行狀況,或者打個日志什么的。失敗了,我也可以記錄個異常信息什么的。
此時(shí),CompleteableFuture就無能為力了。
Gobrs-Async框架提供了這樣的回調(diào)功能。并且,如果執(zhí)行成功、失敗、異常、超時(shí)等場景下都提供了管理線程任務(wù)的能力!
場景概述
場景一
場景一
說明 任務(wù)A 執(zhí)行完了之后,繼續(xù)執(zhí)行 B、C、D
場景二
場景二
說明 任務(wù)A 執(zhí)行完了之后執(zhí)行B 然后再執(zhí)行 C、D
場景三
場景二
說明 任務(wù)A 執(zhí)行完了之后執(zhí)行B、E 然后按照順序 B的流程走C、D、G。 E的流程走F、G
還有更多場景,如果你想詳細(xì)理解任務(wù)編排的概念, 請仔細(xì)閱讀文檔,或者通過資源索引導(dǎo)航到官網(wǎng)了解全貌!
為什么寫這個項(xiàng)目
在開發(fā)復(fù)雜中臺業(yè)務(wù)過程中,難免會遇到調(diào)用各種中臺業(yè)務(wù)數(shù)據(jù), 而且會出現(xiàn)復(fù)雜的中臺數(shù)據(jù)依賴關(guān)系,在這種情況下。代碼的復(fù)雜程度就會增加。 如下圖所示:
在電商平臺業(yè)務(wù)中, 各中臺數(shù)據(jù)可能依賴 商品Product 數(shù)據(jù),而且需要依賴特殊屬性中 Item的數(shù)據(jù)。(有朋友會問,為什么Product 數(shù)據(jù)不和 Item數(shù)據(jù)出自同一個中臺呢?中臺業(yè)務(wù)發(fā)展是多樣性的,不同業(yè)務(wù)中臺設(shè)計(jì)方式不同 , 難道我們就不對接了嗎?所以我們要針對于這種復(fù)雜多變的中臺業(yè)務(wù)數(shù)據(jù)提供技術(shù)支撐才是一個合格的開發(fā)者應(yīng)該做的)而且Item數(shù)據(jù)是HTTP的服務(wù),但Product 是RPC服務(wù)。 如果按照Future的 開發(fā)方式。我們可能會這樣開發(fā)
// 并行處理任務(wù) Product 、 Item 的任務(wù)
@Resource
List<ParaExector> paraExectors;
// 依賴于Product 和 Item的 任務(wù)
@Resource
List<SerExector> serExectors;
public void testFuture(HttpServletRequest httpServletRequest) {
DataContext dataContext = new DataContext();
dataContext.setHttpServletRequest(httpServletRequest);
List<Future> list = new ArrayList<>();
for (AsyncTask asyncTask : paraExectors) {
Future<?> submit = gobrsThreadPoolExecutor.submit(() -> {
asyncTask.task(dataContext, null);
});
list.add(submit);
}
for (Future future : list) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
List<Future> ser = new ArrayList<>();
for (AsyncTask asyncTask : serExectors) {
Future<?> submit = gobrsThreadPoolExecutor.submit(() -> {
asyncTask.task(dataContext, null);
});
ser.add(submit);
}
for (Future future : ser) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
存在的問題
以上示例中,Product數(shù)據(jù)是通過RPC 方式獲取, Item是通過HTTP服務(wù)獲取,大家都知道, RPC性能要高于HTTP性能。 但是通過Future 的方式, get會阻塞等待 Item數(shù)據(jù)返回后才會往下執(zhí)行。 這樣的話, 圖書音像、裝修數(shù)據(jù)、限購數(shù)據(jù)等都要等待Item數(shù)據(jù)返回,但是這些中臺并不依賴Item返回的數(shù)據(jù), 所以會產(chǎn)生等待時(shí)間影響系統(tǒng)整體QPS。
起源
起源
-
作者通過對開源中間件的源碼詳細(xì)閱讀和二次開發(fā)的經(jīng)驗(yàn)和使用心得總結(jié)而來。
-
用戶的一些使用體驗(yàn) 包括業(yè)務(wù)的需求
Gobrs-Async 核心能力
核心能力
業(yè)界對比
在開源平臺找了挺多任務(wù)異步編排框架,發(fā)現(xiàn)都不是很理想,唯一一款現(xiàn)階段比較好用的異步編排框架就數(shù)asyncTool比較好用。但是在使用時(shí)發(fā)現(xiàn),API不是很好用。而且需要頻繁的創(chuàng)建 WorkerWrapper 對象 用起來有點(diǎn)不爽。對于業(yè)務(wù)比較復(fù)雜的場景,在開發(fā)時(shí)需要寫較多的 WorkerWrapper 代碼,而且框架不能對全局異常進(jìn)行攔截。只能通過任務(wù)的 result 方法捕捉單任務(wù)的異常。 不能在任意任務(wù)出現(xiàn)異常后 停止全局的異步任務(wù)。同時(shí)無法在發(fā)生全局異常的時(shí)候進(jìn)行異常攔截。如果需要實(shí)現(xiàn)在發(fā)生需停止全局任務(wù)流的時(shí)候,發(fā)送報(bào)警郵件的功能。 asyncTool就顯得力不從心了。
asyncTool 本身已經(jīng)功能很強(qiáng)大了,本人與asyncTool 作者 都是在京東擔(dān)任研發(fā)工作。 涉及到的場景大同小異。 會有很多業(yè)務(wù)場景,很復(fù)雜的中臺接口調(diào)用關(guān)系。所以針對當(dāng)前業(yè)務(wù)場景。需要探索更多的技術(shù)領(lǐng)域。本身技術(shù)是應(yīng)該服務(wù)于業(yè)務(wù),落地業(yè)務(wù)場景。
想給項(xiàng)目起一個簡單易記的名字,類似于 Eureka、Nacos、Redis;經(jīng)過再三考慮后,決定命名:Gobrs-Async
| 功能 | asyncTool | Gobrs-Async | sirector |
|---|---|---|---|
| 多任務(wù)處理 | 是 | 是 | 是 |
| 單任務(wù)異?;卣{(diào) | 是 | 是 | 否 |
| 全局異常中斷 | 否 | 是 | 否 |
| 可配置任務(wù)流 | 否 | 是 | 否 |
| 自定義異常攔截器 | 否 | 是 | 否 |
| 內(nèi)存優(yōu)化 | 否 | 是 | 否 |
| 可選的任務(wù)執(zhí)行 | 否 | 是 | 否 |
它解決了什么問題
在請求調(diào)用各大中臺數(shù)據(jù)時(shí),難免會出現(xiàn)多個中臺數(shù)據(jù)互相依賴的情況,現(xiàn)實(shí)開發(fā)中會遇到如下場景。
并行常見的場景 1 客戶端請求服務(wù)端接口,該接口需要調(diào)用其他N個微服務(wù)的接口
譬如 請求我的購物車,那么就需要去調(diào)用用戶的rpc、商品詳情的rpc、庫存rpc、優(yōu)惠券等等好多個服務(wù)。同時(shí),這些服務(wù)還有相互依賴關(guān)系,譬如必須先拿到商品id后,才能去庫存rpc服務(wù)請求庫存信息。 最終全部獲取完畢后,或超時(shí)了,就匯總結(jié)果,返回給客戶端。
2 并行執(zhí)行N個任務(wù),后續(xù)根據(jù)這1-N個任務(wù)的執(zhí)行結(jié)果來決定是否繼續(xù)執(zhí)行下一個任務(wù)
如用戶可以通過郵箱、手機(jī)號、用戶名登錄,登錄接口只有一個,那么當(dāng)用戶發(fā)起登錄請求后,我們需要并行根據(jù)郵箱、手機(jī)號、用戶名來同時(shí)查數(shù)據(jù)庫,只要有一個成功了,都算成功,就可以繼續(xù)執(zhí)行下一步。而不是先試郵箱能否成功、再試手機(jī)號……
再如某接口限制了每個批次的傳參數(shù)量,每次最多查詢10個商品的信息,我有45個商品需要查詢,就可以分5堆并行去查詢,后續(xù)就是統(tǒng)計(jì)這5堆的查詢結(jié)果。就看你是否強(qiáng)制要求全部查成功,還是不管有幾堆查成功都給客戶做返回
再如某個接口,有5個前置任務(wù)需要處理。其中有3個是必須要執(zhí)行完畢才能執(zhí)行后續(xù)的,另外2個是非強(qiáng)制的,只要這3個執(zhí)行完就可以進(jìn)行下一步,到時(shí)另外2個如果成功了就有值,如果還沒執(zhí)行完,就是默認(rèn)值。
3 需要進(jìn)行線程隔離的多批次任務(wù)。
如多組任務(wù), 各組任務(wù)之間彼此不相關(guān),每組都需要一個獨(dú)立的線程池,每組都是獨(dú)立的一套執(zhí)行單元的組合。有點(diǎn)類似于hystrix的線程池隔離策略。
4 單機(jī)工作流任務(wù)編排。
5 其他有順序編排的需求。
它有什么特性
Gobrs-Async 在開發(fā)時(shí)考慮了眾多使用者的開發(fā)喜歡,對異常處理的使用場景。并被運(yùn)用到電商生產(chǎn)環(huán)境中,在京東經(jīng)歷這嚴(yán)酷的高并發(fā)考驗(yàn)。同時(shí)框架中 極簡靈活的配置、全局自定義可中斷全流程異常、內(nèi)存優(yōu)化、靈活的接入方式、提供SpringBoot Start 接入方式。更加考慮使用者的開發(fā)習(xí)慣。僅需要注入GobrsTask的Spring Bean 即可實(shí)現(xiàn)全流程接入。
Gobrs-Async 項(xiàng)目目錄及其精簡
-
gobrs-async-example:Gobrs-Async 接入實(shí)例,提供測試用例。 -
gobrs-async-starter:Gobrs-Async 框架核心組件
Gobrs-Async 在設(shè)計(jì)時(shí),就充分考慮了開發(fā)者的使用習(xí)慣, 沒有依賴任何中間件。 對并發(fā)框架做了良好的封裝。主要使用 CountDownLatch 、ReentrantLock 、volatile 等一系列并發(fā)技術(shù)開發(fā)設(shè)計(jì)。
整體架構(gòu)
1.0
任務(wù)觸發(fā)器
任務(wù)流的啟動者, 負(fù)責(zé)啟動任務(wù)執(zhí)行流
規(guī)則解析引擎
負(fù)責(zé)解析使用者配置的規(guī)則,同時(shí)于Spring結(jié)合,將配置的 Spring Bean 解析成 TaskBean,進(jìn)而通過解析引擎加載成 任務(wù)裝飾器。進(jìn)而組裝成任務(wù)樹
任務(wù)啟動器
負(fù)責(zé)通過使用解析引擎解析的任務(wù)樹。結(jié)合 JUC 并發(fā)框架調(diào)度實(shí)現(xiàn)對任務(wù)的統(tǒng)一管理,核心方法有
-
trigger 觸發(fā)任務(wù)加載器,為加載任務(wù)準(zhǔn)備環(huán)境
任務(wù)加載器
負(fù)責(zé)加載任務(wù)流程,開始調(diào)用任務(wù)執(zhí)行器執(zhí)行核心流程
-
load 核心任務(wù)流程方法,在這里阻塞等待整個任務(wù)流程
-
getBeginProcess 獲取子任務(wù)開始流程
-
completed 任務(wù)完成
-
errorInterrupted 任務(wù)失敗 中斷任務(wù)流程
-
error 任務(wù)失敗
任務(wù)執(zhí)行器
最終的任務(wù)執(zhí)行,每一個任務(wù)對應(yīng)一個TaskActuator 任務(wù)的 攔截、異常、執(zhí)行、線程復(fù)用 等必要條件判斷都在這里處理
-
prepare 任務(wù)前置處理
-
preInterceptor 統(tǒng)一任務(wù)前置處理
-
task 核心任務(wù)方法,業(yè)務(wù)執(zhí)行內(nèi)容
-
postInterceptor 統(tǒng)一后置處理
-
onSuccess 任務(wù)執(zhí)行成功回調(diào)
-
onFail 任務(wù)執(zhí)行失敗回調(diào)
任務(wù)總線
任務(wù)流程傳遞總線,包括 請求參數(shù)、任務(wù)加載器、 響應(yīng)結(jié)果, 該對象暴露給使用者,拿到匹配業(yè)務(wù)的數(shù)據(jù)信息,例如: 返回結(jié)果、主動中斷任務(wù)流程等功能 需要任務(wù)總線(TaskSupport)支持
核心類圖
