Flink全鏈路延遲的測量方式和實現(xiàn)原理

本文已經(jīng)加入「大數(shù)據(jù)成神之路PDF版」中提供下載。
你可以關(guān)注公眾號,后臺回復(fù):「PDF」?即可獲取。
一、背景
Flink Job端到端延遲是一個重要的指標(biāo),用來衡量Flink任務(wù)的整體性能和響應(yīng)延遲(大部分流式應(yīng)用,要求低延遲特性)。
通過流處理引擎競品對比,我們發(fā)現(xiàn)大部分流計算引擎產(chǎn)品,都在告警監(jiān)控頁面,集成了全鏈路時延指標(biāo)展示(直方圖)。
一些低延時的處理場景,例如用于登陸、用戶下單規(guī)則檢測,實時預(yù)測場景,需要一個可度量的Metric指標(biāo),來實時觀測、監(jiān)控集群全鏈路時延情況。
二、源碼分析來源
1、本文的源碼分析基于Flink社區(qū)issue FLINK-3660,以及issue對應(yīng)的pr源碼pull-2386,另外,個人也新增了實現(xiàn)源碼的說明。
2、其pr源碼中只涉及到了部分全鏈路時延實現(xiàn)代碼,因此,我在文章中總結(jié)了:
Source到Sink處理Latency Marker源碼 LatencyMarksEmitter 提交時延標(biāo)記類 LatencyStats(時延直方圖Metric實現(xiàn))源碼
時延測量–整體架構(gòu)圖
三、騰訊Oceanus監(jiān)控指標(biāo)參考
如下圖,紅色框線對應(yīng)的數(shù)據(jù)延時,即我們描述的指標(biāo)


四、Flink LatencyMarker實現(xiàn)思路
實現(xiàn)方案變遷
在webinterface中,加入流式j(luò)ob的端到端延遲是一個重要特性。因此,F(xiàn)link社區(qū)最初的想法是在每個記錄的source上附加一個攝取時間(ingestion-time)時間戳。
然而,這為不使用monitor feature(監(jiān)控功能)的用戶,帶來了額外開銷(每個元素+每個元素上的System.currentTimeMilis()需要8個字節(jié))。
因此,F(xiàn)link社區(qū)最后決定,通過定期發(fā)送特殊事件來實現(xiàn)此功能,類似于通過拓?fù)浒l(fā)送水印watermark。
實現(xiàn)原理
這些特殊事件(LatencyMarker)在source上以可配置發(fā)送間隔,并由任務(wù)Task轉(zhuǎn)發(fā)。Sink最后接收到LatencyMarks后,將比較LatencyMarker的時間戳與當(dāng)前系統(tǒng)時間,以確定延遲。
LatencyMarker不會增加作業(yè)的延遲,但是LatencyMarker與常規(guī)記錄類似,可以被delay阻塞(例如反壓情況),因此LatencyMarker的延遲與Record延遲近似。
節(jié)點間時鐘偏移及準(zhǔn)確性
當(dāng)前方案期望所有任務(wù)管理器TaskManager上的時鐘是同步的。否則,測量的延遲也包括TaskManager時鐘之間的偏移。
后續(xù),我們可以嘗試通過使用JobManager作為計時服務(wù)中心(central timing service)來緩解這個問題。taskmanager將定期查詢JM的當(dāng)前時間,以確定其時鐘的偏移量。
這個偏移量仍然包括TM和JM之間的網(wǎng)絡(luò)延遲,但是仍然比較好的測量時延。
五、Flink LatencyMarker實現(xiàn)源碼
本章節(jié)對應(yīng)到pr源碼pull-2386的實現(xiàn),這里簡要說明。

實現(xiàn)基礎(chǔ)類及下發(fā)標(biāo)記
Flink源碼中,引入了一個新的StreamElement,稱為LatencyMarker。
與水印類似,LatencyMarker按配置的間隔從源發(fā)出。這個時間間隔的默認(rèn)值是0毫秒,即不觸發(fā)(配置項在ExecutionConfig#latencyTrackingInterval,名稱metrics.latency.interval),例如可以配置成2000毫秒觸發(fā)一次LatencyMarker發(fā)送。
LatencyMarker不能“多于”常規(guī)元素。這確保了測量的延遲接近于常規(guī)流元素的端到端延遲。
常規(guī)操作符Operator(不包括那些參與迭代的Operator)如果不是sink,就會轉(zhuǎn)發(fā)延遲標(biāo)記LatencyMarker。
多輸出通道—隨機下發(fā)標(biāo)記
具有多個輸出channel的Operator,隨機選擇一個channel通道,將LatencyMarker發(fā)送給它。這可以確保每個LatencyMarker標(biāo)記在系統(tǒng)中只存在一次,并且重新分區(qū)步驟不會導(dǎo)致傳輸?shù)腖atencyMarker數(shù)量激增。
public?class?RecordWriterOutput{
?@Override
?public?void?emitLatencyMarker(LatencyMarker?latencyMarker)?{
??serializationDelegate.setInstance(latencyMarker);
??try?{
???//?內(nèi)部實現(xiàn)了隨機選擇通道
???recordWriter.randomEmit(serializationDelegate);
??}
??catch?(Exception?e)?{
???throw?new?RuntimeException(e.getMessage(),?e);
??}
?}
}
上述RecordWriterOutput#emitLatencyMarker()會被StreamSource、AbstractStreamOperator調(diào)用,分別實現(xiàn)source和中間operator的延遲標(biāo)記下發(fā)。
如果操作符Operator是Sink,它將維護每個已知source實例的最后128個LatencyMarker信息。
Metric展示
每個已知source的最小/最大/平均值/p50/p95/p99時延,在sink的LatencyStats對象中,進行匯總(如果沒有任何輸出的Operator,就是是sink)。
本pr只涉及全鏈路延遲統(tǒng)計的實現(xiàn),F(xiàn)link已有一整套Metric顯示體系,全鏈路時延Metric展示交給Flink框架本身)。
此外,目前還沒有確保系統(tǒng)時鐘同步的機制,因此如果硬件時鐘不正確,則延遲測量將不準(zhǔn)確。
六、時延粒度Granularity說明
時延粒度–概念說明
任意一個中間Operator或Sink,可以通過配置metrics.latency.granularity項,調(diào)整與Source間統(tǒng)計的粒度(Singe、Operator、Subtask):
A、統(tǒng)計的時候,可以選擇source源id、source源subtask index進行組合,調(diào)整統(tǒng)計粒度。
B、統(tǒng)計的時候,當(dāng)前Operator及當(dāng)前Operator subtask index總是參與粒度名稱的生成,固定的。
三種時延跟蹤策略及其源碼定義
Single - 跟蹤延遲,無需區(qū)分:源+源子任務(wù):
(例如雙流Join的兩個source,這里都默認(rèn)為一個數(shù)據(jù)源了)
??SINGLE?{
???String?createUniqueHistogramName(LatencyMarker?marker,?OperatorID?operatorId,?int?operatorSubtaskIndex)?{
????//?只有自己的operatorId和operatorSubtaskIndex參與Metric名稱生成
????//?LatencyMarker帶有的id(源)不參與Metric名稱生成
????return?String.valueOf(operatorId)?+?operatorSubtaskIndex;
???}
??}
Operator - 跟蹤延遲,區(qū)分源,但不區(qū)分源的子任務(wù):
??OPERATOR?{
???String?createUniqueHistogramName(LatencyMarker?marker,?OperatorID?operatorId,?int?operatorSubtaskIndex)?{
???//?LatencyMarker帶有的id(源)中id參與計算
????return?String.valueOf(marker.getOperatorId())?+?operatorId?+?operatorSubtaskIndex;
???}
??}
Subtask - 跟蹤延遲,區(qū)分源+源子任務(wù):
??SUBTASK?{
???String?createUniqueHistogramName(LatencyMarker?marker,?OperatorID?operatorId,?int?operatorSubtaskIndex)?{
????return?String.valueOf(marker.getOperatorId())?+?marker.getSubtaskIndex()?+?operatorId?+?operatorSubtaskIndex;
???}
??}
根據(jù)上述不同的名稱key,將直方圖對象放入Map中,Map定義:
Map?latencyStats?=?new?HashMap<>()
偽代碼(創(chuàng)建直方圖):
latencyHistogram?=?new?DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName,?latencyHistogram);
偽代碼(更新直方圖):
long?now?=?System.currentTimeMillis();
latencyHistogram.update(now?-?marker.getMarkedTime())
Single、Operator、Subtask時延策略在Web Metric中的體現(xiàn)
上述Single、Operator 、Subtask不同測試,生成的Metric名稱和group就會產(chǎn)生變化,Web Metric中名稱相應(yīng)改變
一個Subtask時延粒度的Metric路徑:
Job__?.latency
七、總結(jié)說明
LatencyMarker不參與window、MiniBatch的緩存計時,直接被中間Operator下發(fā)。
Metric路徑:TaskManagerJobMetricGroup/operator_id/operator_subtask_index/latency(根據(jù)時延配置粒度Granularity,路徑會有變化,參考本文第六章節(jié))
每個中間Operator、以及Sink都會統(tǒng)計自己與Source節(jié)點的鏈路延遲,我們在監(jiān)控頁面,一般展示Source至Sink鏈路延遲。
延遲粒度細(xì)分到Task,可以用來排查哪臺機器的Task時延偏高,進行對比和運維排查。
從實現(xiàn)原理來看,發(fā)送時延標(biāo)記間隔配置大一些(例如20秒一次),一般不會影響系統(tǒng)處理業(yè)務(wù)數(shù)據(jù)的性能(所有的StreamSource Task都按間隔發(fā)送時延標(biāo)記,中間節(jié)點有多個輸出通道的,隨機選擇一個通道下發(fā),不會復(fù)制多份數(shù)據(jù)出來)。
參考原文:https://blog.csdn.net/LS_ice/article/details/103295774
「PDF」就可以看到阿里云盤下載鏈接了!



Hi,我是王知無,一個大數(shù)據(jù)領(lǐng)域的原創(chuàng)作者。? 放心關(guān)注我,獲取更多行業(yè)的一手消息。

