1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Flink SQL 在字節(jié)跳動(dòng)的優(yōu)化與實(shí)踐

        共 6964字,需瀏覽 14分鐘

         ·

        2021-03-14 17:47

        整理 | Aven (Flink 社區(qū)志愿者)

        摘要:文由 Apache Flink Committer,字節(jié)跳動(dòng)架構(gòu)研發(fā)工程師李本超分享,以四個(gè)章節(jié)來(lái)介紹 Flink 在字節(jié)的應(yīng)用實(shí)戰(zhàn)。 內(nèi)容如下:


        1. 整體介紹

        2. 實(shí)踐優(yōu)化

        3. 流批一體

        4. 未來(lái)規(guī)劃


        Tips:點(diǎn)擊文末「閱讀原文」可查看作者原版分享視頻~


        一、整體介紹



        2018 年 12 月 Blink 宣布開(kāi)源,經(jīng)歷了約一年的時(shí)間 Flink 1.9 于 2019 年 8 月 22 發(fā)布。在 Flink 1.9 發(fā)布之前字節(jié)跳動(dòng)內(nèi)部基于 master 分支進(jìn)行內(nèi)部的 SQL 平臺(tái)構(gòu)建。經(jīng)歷了 2~3 個(gè)月的時(shí)間字節(jié)內(nèi)部在 19 年 10 月份發(fā)布了基于 Flink 1.9 的 Blink planner 構(gòu)建的 Streaming SQL 平臺(tái),并進(jìn)行內(nèi)部推廣。在這個(gè)過(guò)程中發(fā)現(xiàn)了一些比較有意思的需求場(chǎng)景,以及一些較為奇怪的 BUG。


        基于 1.9 的 Flink SQL 擴(kuò)展


        雖然最新的 Flink 版本已經(jīng)支持 SQL 的 DDL,但 Flink 1.9 并不支持。字節(jié)內(nèi)部基于 Flink 1.9 進(jìn)行了 DDL 的擴(kuò)展支持以下語(yǔ)法:

        1. create table
        2. create view
        3. create function
        4. add resource

        同時(shí) Flink 1.9 版本不支持的 watermark 定義在 DDL 擴(kuò)展后也支持了。

        我們?cè)谕扑]大家盡量的去用 SQL 表達(dá)作業(yè)時(shí)收到很多“SQL 無(wú)法表達(dá)復(fù)雜的業(yè)務(wù)邏輯”的反饋。時(shí)間久了發(fā)現(xiàn)其實(shí)很多用戶所謂的復(fù)雜業(yè)務(wù)邏輯有的是做一些外部的 RPC 調(diào)用,字節(jié)內(nèi)部針對(duì)這個(gè)場(chǎng)景做了一個(gè) RPC 的維表和 sink,讓用戶可以去讀寫(xiě) RPC 服務(wù),極大的擴(kuò)展了 SQL 的使用場(chǎng)景,包括 FaaS 其實(shí)跟 RPC 也是類似的。在字節(jié)內(nèi)部添加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等維表的支持。

        同時(shí)還實(shí)現(xiàn)了多個(gè)內(nèi)部使用的 connectors:

        1. source: RocketMQ
        2. sink: RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics

        并且為 connector 開(kāi)發(fā)了配套的 format:PB/Binlog/Bytes。

        在線的界面化 SQL 平臺(tái)



        除了對(duì) Flink 本身功能的擴(kuò)展,字節(jié)內(nèi)部也上線了一個(gè) SQL 平臺(tái),支持以下功能:


        • SQL 編輯
        • SQL 解析
        • SQL 調(diào)試
        • 自定義 UDF 和 Connector
        • 版本控制
        • 任務(wù)管理

        二、實(shí)踐優(yōu)化


        除了對(duì)功能的擴(kuò)展,針對(duì) Flink 1.9 SQL 的不足之處也做了一些優(yōu)化。

        Window 性能優(yōu)化


        1、支持了 window Mini-Batch

        Mini-Batch 是 Blink planner 的一個(gè)比較有特色的功能,其主要思想是積攢一批數(shù)據(jù),再進(jìn)行一次狀態(tài)訪問(wèn),達(dá)到減少訪問(wèn)狀態(tài)的次數(shù)降低序列化反序列化的開(kāi)銷。這個(gè)優(yōu)化主要是在 RocksDB 的場(chǎng)景。如果是 Heap 狀態(tài) Mini-Batch 并沒(méi)什么優(yōu)化。在一些典型的業(yè)務(wù)場(chǎng)景中,得到的反饋是能減少 20~30% 左右的 CPU 開(kāi)銷。

        2、擴(kuò)展 window 類型

        目前 SQL 中的三種內(nèi)置 window,滾動(dòng)窗口、滑動(dòng)窗口、session 窗口,這三種語(yǔ)意的窗口無(wú)法滿足一些用戶場(chǎng)景的需求。比如在直播的場(chǎng)景,分析師想統(tǒng)計(jì)一個(gè)主播在開(kāi)播之后,每一個(gè)小時(shí)的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等指標(biāo)。自然的滾動(dòng)窗口的劃分方式并不能夠滿足用戶的需求,字節(jié)內(nèi)部就做了一些定制的窗口來(lái)滿足用戶的一些共性需求。

        -- my_window 為自定義的窗口,滿足特定的劃分方式SELECTroom_id,COUNT(DISTINCT user_id)FROM MySourceGROUP BYroom_id,my_window(ts, INTERVAL '1' HOURS)

        3、window offset

        這是一個(gè)較為通用的功能,在 Datastream API 層是支持的,但 SQL 中并沒(méi)有。這里有個(gè)比較有意思的場(chǎng)景,用戶想要開(kāi)一周的窗口,一周的窗口變成了從周四開(kāi)始的非自然周。因?yàn)檎l(shuí)也不會(huì)想到 1970 年 1 月 1 號(hào)那天居然是周四。在加入了 offset    的支持后就可以支持正確的自然周窗口。

        SELECTroom_id,COUNT(DISTINCT user_id)FROM MySourceGROUP BYroom_id,TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)

        維表優(yōu)化


        1、延遲 Join

        維表 Join 的場(chǎng)景下因?yàn)榫S表經(jīng)常發(fā)生變化尤其是新增維度,而 Join 操作發(fā)生在維度新增之前,經(jīng)常導(dǎo)致關(guān)聯(lián)不上。 

        所以用戶希望如果 Join 不到,則暫時(shí)將數(shù)據(jù)緩存起來(lái)之后再進(jìn)行嘗試,并且可以控制嘗試次數(shù),能夠自定義延遲 Join 的規(guī)則。這個(gè)需求場(chǎng)景不單單在字節(jié)內(nèi)部,社區(qū)的很多同學(xué)也有類似的需求。

        基于上面的場(chǎng)景實(shí)現(xiàn)了延遲 Join 功能,添加了一個(gè)可以支持延遲 Join 維表的算子。當(dāng) Join 沒(méi)有命中,local cache 不會(huì)緩存空的結(jié)果,同時(shí)將數(shù)據(jù)暫時(shí)保存在一個(gè)狀態(tài)中,之后根據(jù)設(shè)置定時(shí)器以及它的重試次數(shù)進(jìn)行重試。



        2、維表 Keyby 功能



        通過(guò)拓?fù)湮覀儼l(fā)現(xiàn) Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因?yàn)樗鼪](méi)有一個(gè) key 的語(yǔ)義。 

        當(dāng)作業(yè)并行度比較大,每一個(gè)維表 Join 的 subtask,訪問(wèn)的是所有的緩存空間,這樣對(duì)緩存來(lái)說(shuō)有很大的壓力。

        但觀察 Join 的 SQL,等值 Join 是天然具有 Hash 屬性的。直接開(kāi)放了配置,運(yùn)行用戶直接把維表 Join 的 key 作為 Hash 的條件,將數(shù)據(jù)進(jìn)行分區(qū)。這樣就能保證下游每一個(gè)算子的 subtask 之間的訪問(wèn)空間是獨(dú)立的,這樣可以大大的提升開(kāi)始的緩存命中率。

        除了以上的優(yōu)化,還有兩點(diǎn)目前正在開(kāi)發(fā)的維表優(yōu)化。

        1. 廣播維表:有些場(chǎng)景下維表比較小,而且更新不頻繁,但作業(yè)的 QPS  特別高。如果依然訪問(wèn)外部系統(tǒng)進(jìn)行 Join,那么壓力會(huì)非常大。并且當(dāng)作業(yè)  Failover 的時(shí)候 local cache 會(huì)全部失效,進(jìn)而又對(duì)外部系統(tǒng)造成很大訪問(wèn)壓力。那么改進(jìn)的方案是定期全量 scan 維表,通過(guò)Join key hash 的方式發(fā)送到下游,更新每個(gè)維表 subtask 的緩存。
        2. Mini-Batch:主要針對(duì)一些 I/O 請(qǐng)求比較高,系統(tǒng)又支持 batch 請(qǐng)求的能力,比如說(shuō) RPC、HBase、Redis 等。以往的方式都是逐條的請(qǐng)求,且 Async I/O 只能解決 I/O 延遲的問(wèn)題,并不能解決訪問(wèn)量的問(wèn)題。通過(guò)實(shí)現(xiàn) Mini-Batch 版本的維表算子,大量降低維表關(guān)聯(lián)訪問(wèn)外部存儲(chǔ)次數(shù)。

        Join 優(yōu)化


        目前 Flink 支持的三種 Join 方式;分別是 Interval Join、Regular Join、Temporal Table Function。

        前兩種語(yǔ)義是一樣的流和流 Join。而 Temporal Table 是流和表的的 Join,右邊的流會(huì)以主鍵的形式形成一張表,左邊的流去 Join 這張表,這樣一次 Join 只能有一條數(shù)據(jù)參與并且只返回一個(gè)結(jié)果。而不是有多少條都能 Join 到。

        它們之間的區(qū)別列了幾點(diǎn):


        可以看到三種 Join 方式都有它本身的一些缺陷。

        1. Interval Join 目前使用上的缺陷是它會(huì)產(chǎn)生一個(gè) out join 數(shù)據(jù)和 watermark  亂序的情況。
        2. Regular Join 的話,它最大的缺陷是 retract 放大(之后會(huì)詳細(xì)說(shuō)明這個(gè)問(wèn)題)。
        3. Temporal table function 的問(wèn)題較其它多一些,有三個(gè)問(wèn)題。

          • 不支持 DDl
          • 不支持 out join 的語(yǔ)義 (FLINK-7865 的限制)
          • 右側(cè)數(shù)據(jù)斷流導(dǎo)致 watermark 不更新,下游無(wú)法正確計(jì)算 (FLINK-18934)

        對(duì)于以上的不足之處字節(jié)內(nèi)部都做了對(duì)應(yīng)的修改。

        增強(qiáng) Checkpoint 恢復(fù)能力


        對(duì)于 SQL 作業(yè)來(lái)說(shuō)一旦發(fā)生條件變化都很難從 checkpoint 中恢復(fù)。

        SQL 作業(yè)確實(shí)從 checkpoint 恢復(fù)的能力比較弱,因?yàn)橛袝r(shí)候做一些看起來(lái)不太影響 checkpoint 的修改,它仍然無(wú)法恢復(fù)。無(wú)法恢復(fù)主要有兩點(diǎn);

        • 第一點(diǎn):operate ID 是自動(dòng)生成的,然后因?yàn)槟承┰驅(qū)е滤傻?ID 改變了。

        • 第二點(diǎn):算子的計(jì)算的邏輯發(fā)生了改變,即算子內(nèi)部的狀態(tài)的定義發(fā)生了變化。


        例子1:并行度發(fā)生修改導(dǎo)致無(wú)法恢復(fù)。



        source 是一個(gè)最常見(jiàn)的有狀態(tài)的算子,source 如果和之后的算子的 operator chain 邏輯發(fā)生了改變,是完全無(wú)法恢復(fù)的。 

        下圖左上是正常的社區(qū)版的作業(yè)會(huì)產(chǎn)生的一個(gè)邏輯, source 和后面的并行度一樣的算子會(huì)被 chain 在一起,用戶是無(wú)法去改變的。但算子并行度是常會(huì)會(huì)發(fā)生修改,比如說(shuō) source 由原來(lái)的 100 修改為 50,cacl 的并發(fā)是 100。此時(shí) chain 的邏輯就會(huì)發(fā)生變化。



        針對(duì)這種情況,字節(jié)內(nèi)部做了修改,允許用戶去配置,即使 source 的并行度跟后面整體的作業(yè)的并行度是一樣的,也讓其不與之后的算子 chain 在一起。

        例子2:DAG 改變導(dǎo)致無(wú)法恢復(fù)。



        這是一種比較特殊的情況,有一條 SQL (上圖),可以看到 source 沒(méi)有發(fā)生變化,之后的三個(gè)聚合互相之間沒(méi)有關(guān)系,狀態(tài)竟然也是無(wú)法恢復(fù)。

        作業(yè)之所以無(wú)法恢復(fù),是因?yàn)?operator ID 生成規(guī)則導(dǎo)致的。目前 SQL 中 operator ID 的生成的規(guī)則與上游、本身配置以及下游可以 chain 在一起的算子的數(shù)量都有關(guān)系。 因?yàn)樾略鲋笜?biāo),會(huì)導(dǎo)致新增一個(gè) Calc 的下游節(jié)點(diǎn),進(jìn)而導(dǎo)致 operator ID 發(fā)生變化。 

        為了處理這種情況,支持了一種特殊的配置模式,允許用戶配置生成 operator ID 的時(shí)候可以忽略下游 chain 在一起算子數(shù)量的條件。

        例子3:新增聚合指標(biāo)導(dǎo)致無(wú)法恢復(fù)

        這塊是用戶訴求最大的,也是最復(fù)雜的部分。用戶期望新增一些聚合指標(biāo)后,原來(lái)的指標(biāo)要能從 checkpoint 中恢復(fù)。



        可以看到圖中左部分是 SQL 生成的算子邏輯。count,sum,sum,count,distinct 會(huì)以一個(gè) BaseRow 的結(jié)構(gòu)存儲(chǔ)在 ValueState 中。distinct 比較特殊一些,還會(huì)單獨(dú)存儲(chǔ)在一個(gè) MapState 中。 

        這導(dǎo)致了如新增或者減少指標(biāo),都會(huì)使原先的狀態(tài)沒(méi)辦法從 ValueState 中正常恢復(fù),因?yàn)?VauleState 中存儲(chǔ)的狀態(tài) “schema” 和新的(修改指標(biāo)后)的 “schema”不匹配,無(wú)法正常反序列化。




        在討論解決方案之前,我們先回顧一下正常的恢復(fù)流。先從 checkpoint 中恢復(fù)出狀態(tài)的 serializer,再通過(guò) serializer 把狀態(tài)恢復(fù)。接下來(lái) operator 去注冊(cè)新的狀態(tài)定義,新的狀態(tài)定義會(huì)和原先的狀態(tài)定義進(jìn)行一個(gè)兼容性對(duì)比,如果是兼容則狀態(tài)恢復(fù)成功,如果不兼容則拋出異常任務(wù)失敗。 

        不兼容的另一種處理情況是允許返回一個(gè) migration(實(shí)現(xiàn)兩個(gè)不匹配類型的狀態(tài)恢復(fù))那么也可以恢復(fù)成功。

        針對(duì)上面的流程做出對(duì)應(yīng)的修改:

        1. 第一步使新舊 serializer 互相知道對(duì)方的信息,添加一個(gè)接口,且修改了 statebackend resolve compatibility 的過(guò)程,把舊的信息傳遞給新的,并使其獲取整個(gè) migrate 過(guò)程。
        2. 第二步判斷新老之間是否兼容,如果不兼容是否需要做一次 migration。然后讓舊的 serializer 去恢復(fù)一遍狀態(tài),并使用新的 serializer 寫(xiě)入新的狀態(tài)。
        3. 對(duì) aggregation 的代碼生成進(jìn)行處理,當(dāng)發(fā)現(xiàn) aggregation 拿到的是指標(biāo)是  null,那么將做一些初始化的工作。

        通過(guò)以上的修改基本就可以做到正常的,新增的聚合指標(biāo)從拆開(kāi)的方案恢復(fù)。

        三、流批一體探索


        業(yè)務(wù)現(xiàn)狀


        字節(jié)跳動(dòng)內(nèi)部對(duì)流批一體和業(yè)務(wù)推廣之前,技術(shù)團(tuán)隊(duì)提前做了大量技術(shù)方面的探索。整體判斷是 SQL 這一層是可以做到流批一體的語(yǔ)義,但實(shí)踐中卻又發(fā)現(xiàn)不少不同。

        比如說(shuō)流計(jì)算的 session window,或是基于處理時(shí)間的 window,在批計(jì)算中無(wú)法做到。同時(shí) SQL 在批計(jì)算中一些復(fù)雜的 over window,在流計(jì)算中也沒(méi)有對(duì)應(yīng)的實(shí)現(xiàn)。

        但這些特別的場(chǎng)景可能只占 10% 甚至更少,所以用 SQL 去落實(shí)流批一體是可行的。



        流批一體


        這張圖是比較常見(jiàn)的和大多數(shù)公司里的架構(gòu)都類似。這種架構(gòu)有什么缺陷呢?

        1. 數(shù)據(jù)不同源:批任務(wù)一般會(huì)有一次前置處理任務(wù),不管是離線的也好實(shí)時(shí)的也好,預(yù)先進(jìn)過(guò)一層加工后寫(xiě)入 Hive。而實(shí)時(shí)任務(wù)是從 kafka 讀取原始的數(shù)據(jù),可能是 json 格式,也可能是 avro 等等。直接導(dǎo)致批任務(wù)中可執(zhí)行的 SQL 在流任務(wù)中沒(méi)有結(jié)果生成或者執(zhí)行結(jié)果不對(duì)。
        2. 計(jì)算不同源:批任務(wù)一般是 Hive + Spark 的架構(gòu),而流任務(wù)基本都是基于  Flink。不同的執(zhí)行引擎在實(shí)現(xiàn)上都會(huì)有一些差異,導(dǎo)致結(jié)果不一致。不同的執(zhí)行引擎有不同的 API 定義 UDF,它們之間也是無(wú)法被公用的。大部分情況下都是維護(hù)兩套基于不同 API 實(shí)現(xiàn)的相同功能的 UDF。

        鑒于上面的問(wèn)題,提出了基于 Flink 的流批一體架構(gòu)來(lái)解決。

        1. 數(shù)據(jù)不同源:流式處理先通過(guò) Flink 處理之后寫(xiě)入 MQ 供下游流式 Flink job 去消費(fèi),對(duì)于批式處理由 Flink 處理后流式寫(xiě)入到 Hive,再由批式的 Flink job 去處理。
        2. 引擎不同源:既然都是基于 Flink 開(kāi)發(fā)的流式,批式 job,自然沒(méi)有計(jì)算不同源問(wèn)題,同時(shí)也避免了維護(hù)多套相同功能的 UDF。

        基于 Flink 實(shí)現(xiàn)的流批一體架構(gòu):


        業(yè)務(wù)收益


        1. 統(tǒng)一的 SQL通過(guò)一套 SQL 來(lái)表達(dá)流和批計(jì)算兩種場(chǎng)景,減少開(kāi)發(fā)維護(hù)工作。
        2. 復(fù)用 UDF:流式和批式計(jì)算可以共用一套 UDF。這對(duì)業(yè)務(wù)來(lái)說(shuō)是有積極意義的。
        3. 引擎統(tǒng)一:對(duì)于業(yè)務(wù)的學(xué)習(xí)成本和架構(gòu)的維護(hù)成本都會(huì)降低很多。
        4. 優(yōu)化統(tǒng)一:大部分的優(yōu)化都是可以同時(shí)作用在流式和批式計(jì)算上,比如對(duì) planner、operator 的優(yōu)化流和批可以共享。


        四、未來(lái)工作和規(guī)劃


        優(yōu)化 retract 放大問(wèn)題



        什么是 retract 放大?

        圖有 4 張表,第一張表進(jìn)行去重操作 (Dedup),之后分別和另外三張表做  Join。邏輯比較簡(jiǎn)單,表 A 輸入(A1),最后產(chǎn)出 (A1,B1,C1,D1) 的結(jié)果。

        當(dāng)表 A 輸入一個(gè) A2,因?yàn)?Dedup 算子,導(dǎo)致數(shù)據(jù)需要去重,則向下游發(fā)送一個(gè)撤回 A1 的操作 -(A1) 和一個(gè)新增 A2 的操作 +(A2)。第一個(gè) Join 算子收到 -(A1)  后會(huì)將 -(A1) 變成 -(A1,B1) 和 +(null,B1)(為了保持它認(rèn)為的正確語(yǔ)義) 發(fā)送到下游。之后又收到了 +(A2) ,則又向下游發(fā)送 -(null,B1) 和 +(A2,B1) 這樣操作就放大了兩倍。再經(jīng)由下游的算子操作會(huì)一直被放大,到最終的 sink 輸出可能會(huì)被放大  1000 倍之多。



        如何解決? 

        將原先 retract 的兩條數(shù)據(jù)變成一條 changelog 的格式數(shù)據(jù),在算子之間傳遞。算子接收到 changelog 后處理變更,然后僅僅向下游發(fā)送一個(gè)變更 changelog 即可。

        未來(lái)規(guī)劃


        1.功能優(yōu)化


        • 支持所有類型聚合指標(biāo)變更的 checkpoint 恢復(fù)能力

        • window local-global

        • 事件時(shí)間的 Fast Emit

        • 廣播維表

        • 更多算子的 Mini-Batch 支持:維表,TopN,Join 等

        • 全面兼容 Hive SQL 語(yǔ)法


        2.業(yè)務(wù)擴(kuò)展


        • 進(jìn)一步推動(dòng)流式 SQL 達(dá)到 80%

        • 探索落地流批一體產(chǎn)品形態(tài)

        • 推動(dòng)實(shí)時(shí)數(shù)倉(cāng)標(biāo)準(zhǔn)化


        瀏覽 89
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国产精品一区二区男女羞羞无遮挡 | 中文字字幕在线中文乱 | 香蕉91视频在线观看大全 | 日韩精品一区二区三区色欲AV | 超碰影院在线 | 国产看逼 | 日韩中文字幕视频在线 | 欲乱合集二免费视频 | 亚洲无码看片 | 中国一级操逼视频 |