1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Flink 大規(guī)模作業(yè)調度性能優(yōu)化

        共 7329字,需瀏覽 15分鐘

         ·

        2022-01-22 15:56

        隨著 Flink 流批一體架構不斷演進和升級,越來越多的用戶開始選擇用 Flink 來同時承載實時和離線的業(yè)務。離線業(yè)務和實時業(yè)務有一定差異性,其中比較關鍵的一點是 ——?離線作業(yè)的規(guī)模通常都遠遠大于實時作業(yè)。超大規(guī)模的流批作業(yè)對 Flink 的調度性能提出了新的挑戰(zhàn)。在基于 Flink 1.12 版本部署大規(guī)模流批作業(yè)時,用戶可能會遇到以下瓶頸:


        1. 需要很長時間才能完成作業(yè)的調度和部署;

        2. 需要大量內存來存儲作業(yè)的執(zhí)行拓撲圖以及部署時所需的臨時變量,并且在運行過程中會出現頻繁的長時間 GC,影響集群穩(wěn)定性;



        經測試,對于一個并發(fā)度為 10k 的 word count 作業(yè),在其部署時 JobManager 需要 30 GiB 內存,并且從提交作業(yè)到所有任務節(jié)點部署完畢所需的總時間長達 4 分鐘。

        此外,對于大規(guī)模作業(yè),任務部署的過程可能會長時間阻塞 JobManager 的主線程。當主線程阻塞時,JobManager 無法響應任何來自 TaskManager 的請求。這會使得 TaskManager 心跳超時進而導致作業(yè)出錯失敗。在最壞的情況下,作業(yè)從故障恢復 (Failover) 并進行新一輪部署時又會出現心跳超時,從而導致作業(yè)一直卡在部署階段無法正常運行。


        為了優(yōu)化 Flink 調度大規(guī)模作業(yè)的性能,我們在 Flink 1.13 版本和 1.14 版本進行了以下優(yōu)化:


        1. 針對拓撲結構引入分組概念,優(yōu)化與拓撲相關的計算邏輯,主要包括作業(yè)初始化、Task 調度以及故障恢復時計算需要重啟的 Task 節(jié)點等等。與此同時,該優(yōu)化降低了執(zhí)行拓撲占用的內存空間;

        2. 引入緩存機制優(yōu)化任務部署,優(yōu)化后部署速度更快且所需內存更少;

        3. 基于邏輯拓撲和執(zhí)行拓撲的特性進行優(yōu)化以加快 Pipelined Region 的構建速度,從而降低作業(yè)初始化所需的時間。



        一、性能評測結果




        為了評估優(yōu)化的效果,我們對 Flink 1.12 (優(yōu)化前) 和 Flink 1.14 (優(yōu)化后) 進行了對比測試。測試作業(yè)包含兩個節(jié)點,由全連接邊相連,并發(fā)度均為 10k。為了通過 blob 服務器分發(fā) ShuffleDescriptor,我們將配置項 blob.offload.minsize 的值修改為 100 KiB。該配置項指定了通過 blob 服務器傳輸數據的最小閾值,大小超過該閾值的數據將會通過 Blob 服務器進行傳輸。該配置項的默認值為 1 MiB,而測試作業(yè)中節(jié)點的 ShuffleDescriptor 大小約為 270 KiB。測試結果如表 1 所示:

        表 1 Flink 1.12 和 1.14 各流程時間對比



        1.12

        1.14

        時間降低百分比(%)

        作業(yè)初始化

        11,431ms

        627ms

        94.51%

        任務部署

        63,118ms

        17,183ms

        72.78%

        故障恢復時計算重啟節(jié)點

        37,195ms

        170ms

        99.55%



        除了時間大幅縮短以外,內存占用也明顯降低。在 Flink 1.12 版本上運行測試作業(yè)時,JobManager 需要 30 GiB 內存才能保證作業(yè)穩(wěn)定運行,而在 Flink 1.14 版本上只需要 2 GiB 即可。與此同時,GC 情況也得以改善。在 1.12 版本上,測試作業(yè)在初始化和 Task 部署的過程中都會出現超過 10 秒的長 GC,而在 1.14 版本上均未出現,這意味著心跳超時等問題出現的概率更低,作業(yè)運行更為穩(wěn)定。

        在 1.12 版本上,除去申請資源的時間,測試作業(yè)需要至少 4 分鐘才能部署完成。而作為對比,在 1.14 版本上,除去申請資源的時間,測試作業(yè)在 30 秒內即可完成部署并開始運行。整體所需時間降低了 87%。鑒于此,對于需要部署運行大規(guī)模作業(yè)的用戶,建議將 Flink 版本升級至 1.14 以提升作業(yè)調度和部署性能。

        在接下來的部分中我們將進一步介紹各項優(yōu)化的細節(jié)。

        二、基于拓撲結構的優(yōu)化




        在 Flink 中,分發(fā)模式 (Distribution Pattern) 描述了上游節(jié)點與下游節(jié)點連接的方式,上游節(jié)點計算的結果會按照連邊分發(fā)到下游節(jié)點。目前 Flink 中有兩種分發(fā)模式:點對點 (Pointwise) 和全連接 (All-to-all)。如圖 1 所示,當分發(fā)模式為點對點時,遍歷所有邊的計算復雜度為 O(N);當分發(fā)模式為全連接時,所有下游節(jié)點與上游節(jié)點都有連邊,遍歷所有邊的計算復雜度為 O(N2),所需時間會隨著規(guī)模增大而迅速增長。



        圖 1 目前 Flink 的兩種分發(fā)模式


        Flink 1.12 版本使用執(zhí)行拓撲邊 (ExecutionEdge) 存儲任務節(jié)點間連接的信息。當分發(fā)模式為全連接模式時,節(jié)點間一共會有 O(N2) 條邊相連,當作業(yè)規(guī)模較大時會占用大量內存。對于兩個全連接邊相連且并發(fā)度為 10k 的節(jié)點,其連邊數量為 1 億,總共需要超過 4 GiB 內存來存儲這些連邊。在生產作業(yè)中可能會有多個全連接邊相連的節(jié)點,這也就意味著隨著作業(yè)規(guī)模的增長,所需內存也會大幅增長。

        從圖 1 可以看到,對于全連接邊相連的任務節(jié)點,所有上游節(jié)點所產生的結果分區(qū) (Result Partition) 都是同構的,也就是說這些結果分區(qū)所連接的下游任務節(jié)點都是完全相同的。全連接邊相連的所有下游節(jié)點也都是同構的,因為其所消費的上游分區(qū)都是相同的。鑒于節(jié)點間的 JobEdge 只有一種分發(fā)模式,我們可以按照分發(fā)模式對上游分區(qū)以及下游節(jié)點進行分組。

        對于全連接邊,由于其所有下游節(jié)點都是同構的,我們可以將這些下游節(jié)點劃分為一組,稱為節(jié)點組 (ConsumerVertexGroup),全連接邊相連的所有上游分區(qū)都與這個組連接。同樣,所有同構的上游分區(qū)也被劃分為同一組,稱為分區(qū)組 (ConsumedPartitionGroup),全連接邊相連的所有下游節(jié)點都與這個組相連。優(yōu)化方案的基本思路為:將所有消費相同結果分區(qū)的下游節(jié)點放入同一個節(jié)點組中,同時將所有與相同下游節(jié)點相連的結果分區(qū)放入同一個分區(qū)組中,如圖 2 所示。




        圖 2 兩種分發(fā)模式下如何對結果分區(qū)和任務節(jié)點進行分組

        在調度任務節(jié)點時,Flink 需要遍歷每一個上游分區(qū)和下游節(jié)點間的所有連邊。在優(yōu)化前,由于連邊的總數量為 O(N2),因此將所有邊遍歷一遍的總時間復雜度為 O(N2)。優(yōu)化后,執(zhí)行拓撲邊被分組的概念所替代。鑒于所有同構的分區(qū)都連接到同一個下游節(jié)點組,當 Flink 需要遍歷所有連邊時,只需要將該節(jié)點組遍歷一遍即可,不需要重復遍歷所有節(jié)點,這樣就使得計算復雜度從 O(N2) 降到 O(N)。

        對于點對點的分發(fā)模式,上游結果分區(qū)與下游節(jié)點逐一相連,因此分區(qū)組和節(jié)點組之間也是點對點相連,分組的數量級和執(zhí)行拓撲邊的數量級是一樣的,也就是說,遍歷所有連邊的計算復雜度依舊是 O(N)。

        對于上文我們提到的 word count 作業(yè),采用上述的分組方式取代執(zhí)行拓撲邊可以將執(zhí)行拓撲的內存占用從 4 GiB 降至 12 MiB 左右?;诜纸M的概念,我們對作業(yè)初始化、任務調度以及故障恢復時計算需要重啟的節(jié)點等耗時較長的計算邏輯進行了優(yōu)化。這些計算邏輯都涉及到對上下游之間所有連邊進行遍歷的操作。在優(yōu)化后,其計算復雜度都從 O(N2) 降為 O(N)。

        三、優(yōu)化任務部署


        對于 Flink 1.12 版本,當大規(guī)模作業(yè)內包含全連接邊時,部署所有節(jié)點需要花費很長時間。此外,在部署過程中容易出現 TaskManager 心跳超時的情況,進而導致集群不穩(wěn)定。


        目前任務部署包含以下幾個階段:


        1. JobManager 在主線程內為每一個 Task 創(chuàng)建任務部署描述符 (TaskDeploymentDescriptor,以下簡稱 TDD);

        2. JobManager 在異步線程內將這些 TDD 進行序列化;

        3. JobManager 通過 RPC 通信將序列化后的 TDD 發(fā)送至 TaskManager;

        4. TaskManager 基于 TDD 創(chuàng)建任務并執(zhí)行。



        TDD 包含了 TaskManager 創(chuàng)建任務 (Task) 時所需的所有信息。當任務部署開始時,JobManager 會在主線程內為所有任務節(jié)點創(chuàng)建 TDD。在創(chuàng)建過程中 JobManager 無法響應任何其他請求。對于大規(guī)模作業(yè),這一過程可能會導致 JobManager 主線程長時間被阻塞,進一步導致心跳超時,從而觸發(fā)作業(yè)故障。

        鑒于任務部署時所有 TDD 都是由 JobManager 負責發(fā)送至各 TaskManager,這導致 JobManager 可能會成為性能瓶頸。尤其是對于大規(guī)模作業(yè),部署時產生的 TDD 會占用大量內存空間,導致頻繁的長時間 GC,進一步加重 JobManager 的負擔。

        因此,我們需要縮短創(chuàng)建 TDD 所需的時間,避免心跳超時的發(fā)生。此外,如果能夠縮減 TDD 的大小,網絡傳輸所需的時間也會縮短,這樣可以進一步加快任務部署的速度。


        3.1 為 ShuffleDescriptor 添加緩存機制



        ShuffleDescriptor 用于描述任務在運行時需要消費的上游結果分區(qū)的所有信息。當作業(yè)規(guī)模較大時,ShuffleDescriptor 可能是 TDD 中所占空間最大的一部分。對于全連接邊相連的節(jié)點,當上游節(jié)點和下游節(jié)點的并發(fā)度都是 N 時,每一個下游節(jié)點需要消費 N 個上游結果分區(qū),此時 ShuffleDescriptor 的總數量是 N2。也就是說,計算所有節(jié)點的 ShuffleDescriptor 的時間復雜度為 O(N2)。

        然而,對于同構的下游節(jié)點來說,他們所消費的上游結果分區(qū)是完全一樣的,因此部署時所需要的 ShuffleDescriptor 內容也是一樣的。鑒于此,在部署時不需要為每一個下游節(jié)點重復計算 ShuffleDescriptor,只需要將計算好的 ShuffleDescriptor 放入緩存以供復用即可。這樣計算 TDD 的時間復雜度就可以從 O(N2) 降至 O(N)。

        為了縮減 RPC 消息的大小,進而降低網絡傳輸的開銷,我們可以對 ShuffleDescriptor 進行壓縮。對于上文我們提到的 word count 作業(yè),當節(jié)點并發(fā)度為 10k 時,每一個下游節(jié)點都會有 10k 個 ShuffleDescriptor,在壓縮后其序列化值的總大小降低了 72%。


        3.2 通過 Blob 服務器分發(fā) ShuffleDescriptor



        Blob (Binary Large Object) 以二進制數據的形式存儲大型文件。Flink 通過 blob 服務器在 JobManager 和 TaskManager 之間傳輸體積較大的文件。當 JobManager 需要將大文件傳輸至 TaskManager 時,它可以將文件傳輸至 blob 服務器 (同時會將文件傳輸至分布式文件系統(tǒng)),并且獲得訪問文件所需的 token。當 TaskManager 獲取到 token 時,它們會從分布式文件系統(tǒng) (Distributed File System,DFS) 下載文件。TaskManager 會同時將文件存儲到本地 blob 緩存中方便之后重復讀取。

        在任務部署的過程中,JobManager 負責將 ShuffleDescriptor 通過 RPC 消息分發(fā)到對應的 TaskManager 中。在發(fā)送完成后,RPC 消息會被垃圾回收器回收處理。但當 JobManager 創(chuàng)建 RPC 消息的速度大于發(fā)送的速度時,RPC 消息會逐漸堆積在內存中并且對 GC 造成影響,頻繁觸發(fā)長時間的 GC。這些 GC 會導致 JobManager 停擺,進一步拖慢任務部署的速度。

        為了解決這個問題,Flink 可以通過 blob 服務器來分發(fā)大體積的 ShuffleDescriptor。首先 JobManager 將 ShuffleDescriptor 發(fā)送至 blob 服務器,而 blob 服務器會將 ShuffleDescriptor 存儲至 DFS 中,TaskManager 在開始處理 TDD 時會從 DFS 下載數據。這樣 JobManager 不需要將所有 ShuffleDescriptor 始終存儲在內存中直至對應的 RPC 消息發(fā)出。經過優(yōu)化后,在部署大規(guī)模作業(yè)時長時間 GC 的頻率會明顯降低。且鑒于 DFS 為 TaskManager 提供了多個分布式節(jié)點下載數據,JobManager 網絡傳輸的壓力也得以緩解,不再成為瓶頸,這樣可以加快任務部署的速度。




        圖 3 JobManager 將 ShuffleDescriptor 分發(fā)至 TaskManager

        為了避免緩存過多導致本地磁盤空間不足,當 ShuffleDescriptor 所對應的結果分區(qū)被釋放時,在 blob 服務器上存儲的對應緩存會被清理。此外我們?yōu)?TaskManager 上 ShuffleDescriptor 的緩存添加了總大小的限制。當緩存超過一定大小時,緩存會按照最近最少使用 (LRU) 的順序移除。這樣可以保證本地磁盤不會被緩存占滿,特別是對于 session 模式運行的集群。

        四、針對 Pipelined Region 構建的優(yōu)化


        目前 Flink 中節(jié)點間有兩種數據交換類型:pipelined 和 blocking。對于 blocking 的數據交換方式,結果分區(qū)會在上游全部計算完成后再交由下游進行消費,數據會持久化到本地,支持多次消費。對于 pipelined 數據交換,上游結果分區(qū)的產出和下游任務節(jié)點的消費是同時進行的,所有數據不會被持久化且只能讀取一次。

        鑒于 pipelined 的數據流產出和消費同時發(fā)生,Flink 需要保證 pipelined 邊相連的上下游節(jié)點同時運行。由 pipelined 邊相連的節(jié)點構成了一個 region,被稱為 Pipelined Region (以下簡稱 region)。在 Flink 中,region 是任務調度和 Failover 的基本單位。在調度的過程中,同一 region 內的所有 Task 節(jié)點都會被同時調度,而整個拓撲中所有 region 會按照拓撲順序逐一進行調度。

        目前在 Flink 的調度層面有兩種 region:邏輯層面的 Logical Pipelined Region 以及執(zhí)行調度層面的 Scheduling Pipelined Region。邏輯 region 由邏輯拓撲 (JobGraph) 中的節(jié)點 JobVertex 構成,而執(zhí)行 region 則由執(zhí)行拓撲 (ExecutionGraph) 中的節(jié)點 ExecutionVertex 構成。類似于 ExecutionVertex 基于 JobVertex 計算產生,執(zhí)行 region 是由邏輯 region 計算得到的,如圖 4 所示。




        圖 4 邏輯 region 以及執(zhí)行 region

        在構建 region 的過程中會遇到一個問題:region 之間可能存在環(huán)形依賴。對于當前 region,當且僅當其所消費的上游 region 都產出全部數據后才能進行調度。如果兩個?region?之間存在環(huán)形依賴,那么就會出現調度死鎖:兩個?region?都需要等對方完成才能調度,最終兩個?region?都無法被調度起來。因此,Flink 通過 Tarjan 強連通分量算法來發(fā)現環(huán)形依賴,并將具有環(huán)形依賴的?region?合并成一個?region,這樣就能解決調度死鎖的問題。Tarjan 強連通分量算法需要遍歷拓撲內的所有邊,而對于全連接的分發(fā)模式來說,其邊的數量為 O(N2),因此算法整體的計算復雜度為 O(N2),隨著規(guī)模變大會顯著增長,從而影響大規(guī)模作業(yè)初始化的時間。




        圖 5 具有調度死鎖的拓撲

        為了加快 region 的構建速度,我們可以基于邏輯拓撲和執(zhí)行拓撲之間的關聯進行優(yōu)化。鑒于一個執(zhí)行 region 只能由一個邏輯 region 中的節(jié)點派生,不會出現跨?region?的情況,Flink 在初始化作業(yè)時只需要遍歷所有邏輯 region 并逐一轉換成執(zhí)行 region 即可。轉換的方式跟分發(fā)模式相關。如果邏輯 region 內的節(jié)點間有任何全連接邊,則整個邏輯 region 可以直接轉換成一個執(zhí)行 region。




        圖 6 如何將邏輯 region 轉換成執(zhí)行 region

        如果全連接邊采用的是 pipelined 數據交換,所有與之相連的上下游節(jié)點都必須同時運行,也就是說全連接邊所連接的所有?region?都要合并成一個?region。如果全連接邊采用的是 blocking 數據交換,則會引入環(huán)形依賴,如圖 5 所示。在這種情況下所有與之相連的?region?都必須合并以避免調度死鎖,如圖 6 所示。鑒于只要有全連接邊就直接生成一整個執(zhí)行 region,在這種情況下不需要用 Tarjan 算法,整體計算復雜度只需要 O(N) 即可。

        如果在邏輯 region 內,所有節(jié)點間都只有點對點的分發(fā)模式,那么 Flink 依舊直接用 Tarjan 算法來檢測環(huán)形依賴,鑒于點對點的分發(fā)模式其邊數為 O(N),算法的時間復雜度也只有 O(N)。

        在優(yōu)化后,將邏輯 region 轉換成執(zhí)行 region 的整體計算復雜度從 O(N2) 降為 O(N)。經測試,對于上文提到的 word count 作業(yè),當兩個節(jié)點間的連邊為全連接邊且數據交換方式為 blocking 時,構建 region 的總時間降低了 99%,從 8,257ms 降至 120ms。

        瀏覽 33
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            美女一级毛片老司机 | 两根一前一后挺进她的身体视频 | 日本三级黄色网址 | 麻豆精品无码国产 | 日本操B 国产日B大白奶美女 | 欧美老妇一区二区三区 | 伊人五月丁香 | 国产精品99久久久久久噜噜 | 无码性爱在线观看 | 黄色成人网站入口 |