1. Flink 原理 | Flink 新一代流計(jì)算和容錯(cuò)——階段總結(jié)和展望

        共 6694字,需瀏覽 14分鐘

         ·

        2022-03-06 03:31

        ▼ 關(guān)注「Apache Flink」,獲取更多技術(shù)干貨?

        摘要本文整理自 Apache Flink 引擎架構(gòu)師、阿里巴巴存儲(chǔ)引擎團(tuán)隊(duì)負(fù)責(zé)人梅源在 Flink Forward Asia 2021 核心技術(shù)專(zhuān)場(chǎng)的演講。本次演講內(nèi)容圍繞 Flink 的高可用性探討 Flink 新一代流計(jì)算的核心問(wèn)題和技術(shù)選型,包括:


        1. Flink 高可用流計(jì)算的關(guān)鍵路徑
        2. 容錯(cuò) (Fault Tolerance) 2.0 及關(guān)鍵問(wèn)題
        3. 數(shù)據(jù)恢復(fù)過(guò)程
        4. 穩(wěn)定快速高效的 Checkpointing
        5. 云原生下容錯(cuò)和彈性擴(kuò)縮容

        Tips:點(diǎn)擊「閱讀原文」查看原文視頻 & 演講PDF~

        一、高可用流計(jì)算的關(guān)鍵路徑



        上圖的雙向軸線(xiàn)是大數(shù)據(jù)應(yīng)用隨時(shí)間延遲的圖譜,越往右邊時(shí)間延遲要求越短,越往左延遲要求沒(méi)那么高。Flink 誕生之初大概是在上圖中間,可以理解為往右對(duì)應(yīng)的是流式計(jì)算,而往左邊對(duì)應(yīng)的是批式計(jì)算。過(guò)去一兩年,F(xiàn)link 的應(yīng)用圖譜向左邊有了很大的擴(kuò)展,也就是我們常說(shuō)的流批一體;與此同時(shí)我們也從來(lái)沒(méi)有停止過(guò)把圖譜向更實(shí)時(shí)的方向推進(jìn)。

        Flink 是以流式計(jì)算起家,那么向更實(shí)時(shí)的方向推進(jìn)到底是指什么?什么是更實(shí)時(shí)更極致的流式計(jì)算?

        在正常處理的情況下,F(xiàn)link 引擎框架本身除了定期去做 Checkpoint 的快照,幾乎沒(méi)有其他額外的開(kāi)銷(xiāo),而且 Checkpoint 快照很大一部分是異步的,所以正常處理下 Flink 是非常高效的,端到端的延遲在 100 毫秒左右。正因?yàn)橐С指咝У奶幚恚現(xiàn)link 在做容錯(cuò)恢復(fù)和 Rescale 的時(shí)候代價(jià)都會(huì)比較大:需要把整個(gè)作業(yè)停掉,然后從過(guò)去的快照檢查點(diǎn)整體恢復(fù),這個(gè)過(guò)程大概需要幾秒鐘,在作業(yè)狀態(tài)比較大的情況下會(huì)達(dá)到分鐘級(jí)。如果需要預(yù)熱或啟動(dòng)其他服務(wù)進(jìn)程,時(shí)間就更長(zhǎng)了。

        所以,F(xiàn)link 極致流計(jì)算的關(guān)鍵點(diǎn)在容錯(cuò)恢復(fù)部分。這里說(shuō)的極致的流計(jì)算是指對(duì)延遲性、穩(wěn)定性和一致性都有一定要求的場(chǎng)景,比如風(fēng)控安全。這也是 Fault Tolerance 2.0 要解決的問(wèn)題。

        二、容錯(cuò) (Fault Tolerance) 2.0

        及關(guān)鍵問(wèn)題



        容錯(cuò)恢復(fù)是一個(gè)全鏈路的問(wèn)題,包括 failure detect、job cancel、新的資源申請(qǐng)調(diào)度、狀態(tài)恢復(fù)和重建等。同時(shí),如果想從已有的狀態(tài)恢復(fù),就必須在正常處理過(guò)程中做 Checkpoint,并且將它做得足夠輕量化才不會(huì)影響正常處理。

        容錯(cuò)也是多維度的問(wèn)題,不同的用戶(hù)、不同的場(chǎng)景對(duì)容錯(cuò)都有不同需求,主要包括以下幾個(gè)方面:


        • 數(shù)據(jù)一致性 (Data Consistency),有些應(yīng)用比如在線(xiàn)機(jī)器學(xué)習(xí)是可以容忍部分?jǐn)?shù)據(jù)丟失;

        • 延遲 (Latency),某些場(chǎng)景對(duì)端到端的延遲要求沒(méi)那么高,所以可以將正常處理和容錯(cuò)恢復(fù)的時(shí)候要做的工作綜合平均一下;

        • 恢復(fù)時(shí)的行為表現(xiàn) (Recovery Behavior),比如大屏或者報(bào)表實(shí)時(shí)更新的場(chǎng)景下,可能并不需要迅速全量恢復(fù),更重要的在于迅速恢復(fù)第一條數(shù)據(jù);

        • 代價(jià) (Cost),用戶(hù)根據(jù)自己的需求,愿意為容錯(cuò)付出的代價(jià)也不一樣。綜上,我們需要從不同的角度去考慮這個(gè)問(wèn)題。

        另外,容錯(cuò)也不僅僅是 Flink 引擎?zhèn)鹊膯?wèn)題。Flink 和云原生的結(jié)合是 Flink 未來(lái)的重要方向,我們對(duì)于云原生的依賴(lài)方式也決定了容錯(cuò)的設(shè)計(jì)和走向。我們期望通過(guò)非常簡(jiǎn)單的弱依賴(lài)來(lái)利用云原生帶來(lái)的便利,比如 across region durability,最終能夠?qū)⒂袪顟B(tài)的 Flink 的應(yīng)用像原生的無(wú)狀態(tài)應(yīng)用一樣彈性部署。

        基于以上考慮,我們?cè)?Flink 容錯(cuò) 2.0 工作也有不同的側(cè)重點(diǎn)和方向。

        第一,從調(diào)度的角度來(lái)考慮,每次錯(cuò)誤恢復(fù)的時(shí)候,不會(huì)把和全局快照相對(duì)應(yīng)的所有 task 節(jié)點(diǎn)都回滾,而是只恢復(fù)失敗的單個(gè)或者部分節(jié)點(diǎn),這個(gè)對(duì)需要預(yù)熱或單個(gè)節(jié)點(diǎn)初始化時(shí)間很長(zhǎng)的場(chǎng)景是很有必要的,比如在線(xiàn)機(jī)器學(xué)習(xí)場(chǎng)景。與此相關(guān)的一些工作比如 Approximate Task-local Recovery 已在 VVP 上線(xiàn);Exactly-once Task-local Recovery,我們也已經(jīng)取得了一些成果。

        接下來(lái)重點(diǎn)聊一下 Checkpoint 以及和云原生相關(guān)的部分。

        三、Flink 中的數(shù)據(jù)恢復(fù)過(guò)程


        那么,容錯(cuò)到底解決了什么?在我看來(lái)其本質(zhì)是解決數(shù)據(jù)恢復(fù)的問(wèn)題。


        Flink 的數(shù)據(jù)可以粗略分為以下三類(lèi),第一種是元信息,相當(dāng)于一個(gè) Flink 作業(yè)運(yùn)行起來(lái)所需要的最小信息集合,包括比如 Checkpoint 地址、Job Manager、Dispatcher、Resource Manager 等等,這些信息的容錯(cuò)是由 Kubernetes/Zookeeper 等系統(tǒng)的高可用性來(lái)保障的,不在我們討論的容錯(cuò)范圍內(nèi)。Flink 作業(yè)運(yùn)行起來(lái)以后,會(huì)從數(shù)據(jù)源讀取數(shù)據(jù)寫(xiě)到 Sink 里,中間流過(guò)的數(shù)據(jù)稱(chēng)為處理的中間數(shù)據(jù) Inflight Data (第二類(lèi))。對(duì)于有狀態(tài)的算子比如聚合算子,處理完輸入數(shù)據(jù)會(huì)產(chǎn)生算子狀態(tài)數(shù)據(jù) (第三類(lèi))。

        Flink 會(huì)周期性地對(duì)所有算子的狀態(tài)數(shù)據(jù)做快照,上傳到持久穩(wěn)定的海量存儲(chǔ)中 (Durable Bulk Store),這個(gè)過(guò)程就是做 Checkpoint。Flink 作業(yè)發(fā)生錯(cuò)誤時(shí),會(huì)回滾到過(guò)去的一個(gè)快照檢查點(diǎn) Checkpoint 恢復(fù)。

        我們當(dāng)前有非常多的工作是針對(duì)提升 Checkpointing 效率來(lái)做的,因?yàn)樵趯?shí)際工作中,引擎層大部分 Oncall 或工單問(wèn)題基本上都與 Checkpoint 相關(guān),各種原因會(huì)造成 Checkpointing 超時(shí)。

        下面簡(jiǎn)單回顧一下 Checkpointing 的流程,對(duì)這部分內(nèi)容比較熟悉的同學(xué)可以直接跳過(guò)。Checkpointing 的流程分為以下幾步:


        第一步:Checkpoint Coordinate 從 Source 端插入 Checkpoint Barrier (上圖黃色的豎條)。


        第二步:Barrier 會(huì)隨著中間數(shù)據(jù)處理向下游流動(dòng),流過(guò)算子的時(shí)候,系統(tǒng)會(huì)給算子的當(dāng)前狀態(tài)做一個(gè)同步快照,并將這個(gè)快照數(shù)據(jù)異步上傳到遠(yuǎn)端存儲(chǔ)。這樣一來(lái),Barrier 之前所有的輸入數(shù)據(jù)對(duì)算子的影響都已反映在算子的狀態(tài)中了。如果算子狀態(tài)很大,會(huì)影響完成 Checkpointing 的時(shí)間。


        第三步:當(dāng)一個(gè)算子有多個(gè)輸入的時(shí)候,需要算子拿到所有輸入的 Barrier 之后才能開(kāi)始做快照,也就是上圖藍(lán)色框的部分。可以看到,如果在對(duì)齊過(guò)程中有反壓,造成中間處理數(shù)據(jù)流動(dòng)緩慢,沒(méi)有反壓的那些線(xiàn)路也會(huì)被堵住,Checkpoint 會(huì)做得很慢,甚至做不出來(lái)。


        第四步:所有算子的中間狀態(tài)數(shù)據(jù)都成功上傳到遠(yuǎn)端穩(wěn)定存儲(chǔ)之后, 一個(gè)完整的 Checkpoint 才算真正完成。

        從這 4 個(gè)步驟中可以看到,影響快速穩(wěn)定地做 Checkpoint 的因素主要有 2 個(gè),一個(gè)是處理的中間數(shù)據(jù)流動(dòng)緩慢,另一個(gè)是算子狀態(tài)數(shù)據(jù)過(guò)大,造成上傳緩慢,下面來(lái)講一講如何來(lái)解決這兩個(gè)因素。

        四、穩(wěn)定快速高效的 Checkpointing



        針對(duì)中間數(shù)據(jù)流動(dòng)緩慢,可以:


        1. 想辦法不被中間數(shù)據(jù)堵塞:Unaligned Checkpoint——直接跳過(guò)阻塞的中間數(shù)據(jù);

        2. 或者讓中間的數(shù)據(jù)變得足夠少:Buffer Debloating。

        3. 針對(duì)狀態(tài)數(shù)據(jù)過(guò)大,我們需要將每次做 Checkpoint 時(shí)上傳的數(shù)據(jù)狀態(tài)變得足夠小:Generalized Log-Based Incremental Checkpoint。

        下面來(lái)具體展開(kāi)闡述每一種解決方法。

        4.1 Unaligned Checkpoint



        Unaligned Checkpoint 的原理是將從 Source 插入的 Barrier 跳過(guò)中間數(shù)據(jù)瞬時(shí)推到 Sink,跳過(guò)的數(shù)據(jù)一起放在快照里。所以對(duì)于 Unaligned Checkpoint 來(lái)說(shuō),它的狀態(tài)數(shù)據(jù)不僅包括算子的狀態(tài)數(shù)據(jù),還包括處理的中間數(shù)據(jù),可以理解成給整個(gè) Flink Pipeline 做了一個(gè)完整的瞬時(shí)快照,如上圖黃色框所示。雖然 Unaligned Checkpoint 可以非常快速地做 Checkpoint,但它需要存儲(chǔ)額外的 Pipeline Channel 的中間數(shù)據(jù),所以需要存儲(chǔ)的狀態(tài)會(huì)更大。Unaligned Checkpoint 在去年 Flink-1.11 版本就已經(jīng)發(fā)布,F(xiàn)link-1.12 和 1.13 版本支持 Unaligned Checkpoint 的 Rescaling 和動(dòng)態(tài)由 Aligned Checkpoint 到 Unaligned Checkpoint 的切換。

        4.2 Buffer Debloating


        Buffer Debloating 的原理是在不影響吞吐和延遲的前提下,縮減上下游緩存的數(shù)據(jù)。經(jīng)過(guò)觀察,我們發(fā)現(xiàn)算子并不需要很大的 input/output buffer。緩存太多數(shù)據(jù)除了讓作業(yè)在數(shù)據(jù)流動(dòng)緩慢時(shí)把整個(gè) pipeline 填滿(mǎn),讓作業(yè)內(nèi)存超用 OOM 以外,沒(méi)有太大的幫助。


        這里可以做個(gè)簡(jiǎn)單的估算,對(duì)于每個(gè) task,無(wú)論是輸出還是輸入,我們總的 buffer 數(shù)目大概是每個(gè) channel 對(duì)應(yīng)的 exclusive buffer 數(shù)乘以 channel 的個(gè)數(shù)再加上公用的 floating buffer 數(shù)。這個(gè) buffer 總數(shù)再乘以每個(gè) buffer 的 size,得到的結(jié)果就是總的 local buffer pool 的 size。然后我們可以把系統(tǒng)默認(rèn)值代進(jìn)去算一下,就會(huì)發(fā)現(xiàn)并發(fā)稍微大一點(diǎn)再多幾次數(shù)據(jù) shuffle,整個(gè)作業(yè)中間的流動(dòng)數(shù)據(jù)很容易就會(huì)達(dá)到幾個(gè) Gigabytes。

        實(shí)際中我們并不需要緩存這么多數(shù)據(jù),只需要足夠量的數(shù)據(jù)保證算子不空轉(zhuǎn)即可,這正是 Buffer Debloating 做的事情。Buffer Debloating 能夠動(dòng)態(tài)調(diào)整上下游總 buffer 的大小,在不影響性能的情況下最小化作業(yè)所需的 buffer size。目前的策略是上游會(huì)動(dòng)態(tài)緩存下游大概一秒鐘能夠處理的數(shù)據(jù)。此外,Buffer Debloating 對(duì) Unaligned Checkpoint 也是有好處的。因?yàn)?Buffer Debloating 減少了中間流動(dòng)的數(shù)據(jù),所以 Unaligned Checkpoint 在做快照的時(shí)候,需要額外存儲(chǔ)的中間數(shù)據(jù)也會(huì)變少。


        上圖是對(duì) Buffer Debloating 在反壓的情況下,Checkpointing 時(shí)間隨 Debloat Target 變化的時(shí)間對(duì)比圖。Debloat Target 是指上游緩存 “預(yù)期時(shí)間” 內(nèi)下游能處理的數(shù)據(jù)。這個(gè)實(shí)驗(yàn)中,F(xiàn)link 作業(yè)共有 5 個(gè) Network Exchange,所以總共 Checkpointing 所需的時(shí)間大約等于 5 倍的 Debloat Target,這與實(shí)驗(yàn)結(jié)果也基本一致。

        4.3 Generalized Log-Based Incremental Checkpoint


        前面提到狀態(tài)大小也會(huì)影響完成 Checkpointing 的時(shí)間,這是因?yàn)?Flink 的 Checkpointing 過(guò)程由兩個(gè)部分組成:同步的快照和異步上傳。同步的過(guò)程通常很快,把內(nèi)存中的狀態(tài)數(shù)據(jù)刷到磁盤(pán)上就可以了。但是異步上傳狀態(tài)數(shù)據(jù)的部分和上傳的數(shù)據(jù)量有關(guān),因此我們引入了 Generalized Log-Based Incremental Checkpoint 來(lái)控制每次快照時(shí)需要上傳的數(shù)據(jù)量。


        對(duì)于有狀態(tài)的算子,它的內(nèi)部狀態(tài)發(fā)生改變后,這個(gè)更新會(huì)記錄在 State Table 里,如上圖所示。當(dāng) Checkpointing 發(fā)生的時(shí)候,以 RocksDB 為例,這個(gè) State Table 會(huì)被刷到磁盤(pán)上,磁盤(pán)文件再異步上傳到遠(yuǎn)端存儲(chǔ)。根據(jù) Checkpoint 的模式,上傳的部分可以是完整的 Checkpoint 或 Checkpoint 增量部分。但無(wú)論是哪種模式,它上傳文件的大小都是與 State Backend 存儲(chǔ)實(shí)現(xiàn)強(qiáng)綁定的。例如 RocksDB 雖然也支持增量 Checkpoint,但是一旦觸發(fā)多層 Compaction,就會(huì)生成很多新的文件,而這種情況下增量的部分甚至?xí)纫粋€(gè)完整的 Checkpoint 更大,所以上傳時(shí)間依然不可控。


        既然是上傳過(guò)程導(dǎo)致 Checkpointing 超時(shí),那么把上傳過(guò)程從 Checkpointing 過(guò)程中剝離開(kāi)來(lái)就能解決問(wèn)題。這其實(shí)就是 Generalized Log-Based Incremental Checkpoint 想要做的事情:本質(zhì)上就是將 Checkpointing 過(guò)程和 State Backend 存儲(chǔ) Compaction 完全剝離開(kāi)。

        具體實(shí)現(xiàn)方法如下:對(duì)于一個(gè)有狀態(tài)的算子,我們除了將狀態(tài)更新記錄在 State Table 里面,還會(huì)再寫(xiě)一份增量到 State Changelog,并將它們都異步的刷到遠(yuǎn)端存儲(chǔ)上。這樣,Checkpoint 變成由兩個(gè)部分組成,第一個(gè)部分是當(dāng)前已經(jīng)物化存在遠(yuǎn)端存儲(chǔ)上的 State Table,第二個(gè)部分是還沒(méi)有物化的增量部分。因此真正做 Checkpoint 的時(shí)候,需要上傳的數(shù)據(jù)量就會(huì)變得少且穩(wěn)定,不僅可以把 Checkpoint 做得更穩(wěn)定,還可以做得更高頻。可以極大縮短端到端的延遲。特別對(duì)于 Exactly Once Sink,因?yàn)樾枰瓿赏暾?Checkpoint 以后才能完成二階段提交。

        五、云原生下容錯(cuò)和彈性擴(kuò)縮容


        在云原生的大背景下,快速擴(kuò)縮容是 Flink 的一大挑戰(zhàn),特別是 Flink-1.13 版本引入了 Re-active Scaling 模式后,F(xiàn)link 作業(yè)需要頻繁做 Scaling-In/Out,因此 Rescaling 已成為 Re-active 的主要瓶頸。Rescaling 和容錯(cuò) (Failover) 要解決的問(wèn)題在很大程度上是類(lèi)似的:例如拿掉一臺(tái)機(jī)器后,系統(tǒng)需要快速感知到,需要重新調(diào)度并且重新恢復(fù)狀態(tài)等。當(dāng)然也有不同點(diǎn),F(xiàn)ailover 的時(shí)候只需要恢復(fù)狀態(tài),將狀態(tài)拉回到算子上即可;但 Rescaling 的時(shí)候,因?yàn)橥負(fù)鋾?huì)導(dǎo)致并行度發(fā)生變化,需要重新分配狀態(tài)。


        狀態(tài)恢復(fù)的時(shí)候,我們首先需要將狀態(tài)數(shù)據(jù)從遠(yuǎn)端存儲(chǔ)讀取到本地,然后根據(jù)讀取的數(shù)據(jù)重新分配狀態(tài)。如上圖所示,整個(gè)這個(gè)過(guò)程在狀態(tài)稍大的情況下,單個(gè)并發(fā)都會(huì)超過(guò) 30 分鐘。并且在實(shí)際中,我們發(fā)現(xiàn)狀態(tài)重新分配所需要的時(shí)間遠(yuǎn)遠(yuǎn)大于從遠(yuǎn)端存儲(chǔ)讀取狀態(tài)數(shù)據(jù)的時(shí)間。


        那么狀態(tài)是如何重新分配的呢?Flink 的狀態(tài)用 Key Group 作為最小單位來(lái)切分,可以理解成把狀態(tài)的 Key Space 映射到一個(gè)從 0 開(kāi)始的正整數(shù)集,這個(gè)正整數(shù)集就是 Key Group Range。這個(gè) Key Group Range 和算子的所允許的最大并發(fā)度相關(guān)。如上圖所示,當(dāng)我們把算子并發(fā)度從 3 變成 4 的時(shí)候,重新分配的 Task1 的狀態(tài)是分別由原先的兩個(gè) Task 狀態(tài)的一部分拼接而成的,并且這個(gè)拼接狀態(tài)是連續(xù)且沒(méi)有交集的,所以我們可以利用這一特性做一些優(yōu)化。


        上圖可以看到優(yōu)化后,DB Rebuild 這部分優(yōu)化效果還是非常明顯的,但目前這部分工作還處于探索性階段,有很多問(wèn)題尚未解決,所以暫時(shí)還沒(méi)有明確的社區(qū)計(jì)劃。

        最后簡(jiǎn)單回顧一下本文的內(nèi)容。我們首先討論了為什么要做容錯(cuò),因?yàn)槿蒎e(cuò)是 Flink 流計(jì)算的關(guān)鍵路徑;然后分析了影響容錯(cuò)的因素,容錯(cuò)是一個(gè)全鏈路的問(wèn)題,包括 Failure Detection、Job Canceling、新的資源申請(qǐng)調(diào)度、狀態(tài)恢復(fù)和重建等,需要從多個(gè)維度去權(quán)衡思考這個(gè)問(wèn)題;當(dāng)前我們的重點(diǎn)主要是放在如何穩(wěn)定快速做 Checkpoint 的部分,因?yàn)楝F(xiàn)在很多實(shí)際的問(wèn)題都和做 Checkpoint 相關(guān);最后我們討論了如何將容錯(cuò)放在云原生的大背景下與彈性擴(kuò)縮容相結(jié)合的一些探索性工作。

        往期精選


        更多 Flink 相關(guān)技術(shù)問(wèn)題,可掃碼加入社區(qū)釘釘交流群~

        ???戳我,查看原文視頻&演講PDF~

        瀏覽 71
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 操碰97 | 99操逼| 大荫蒂视频另类XX | 日本一級片黃色一級九抓直播 | 人人草天天草 |