Flink SQL 在字節(jié)跳動(dòng)的優(yōu)化與實(shí)踐
摘要:本文由 Apache Flink Committer,字節(jié)跳動(dòng)架構(gòu)研發(fā)工程師李本超分享,以四個(gè)章節(jié)來(lái)介紹 Flink 在字節(jié)的應(yīng)用實(shí)戰(zhàn)。 內(nèi)容如下:
整體介紹
實(shí)踐優(yōu)化
流批一體
未來(lái)規(guī)劃
Tips:點(diǎn)擊文末「閱讀原文」可查看作者原版分享視頻~
一、整體介紹

2018 年 12 月 Blink 宣布開(kāi)源,經(jīng)歷了約一年的時(shí)間 Flink 1.9 于 2019 年 8 月 22 發(fā)布。在 Flink 1.9 發(fā)布之前字節(jié)跳動(dòng)內(nèi)部基于 master 分支進(jìn)行內(nèi)部的 SQL 平臺(tái)構(gòu)建。經(jīng)歷了 2~3 個(gè)月的時(shí)間字節(jié)內(nèi)部在 19 年 10 月份發(fā)布了基于 Flink 1.9 的 Blink planner 構(gòu)建的 Streaming SQL 平臺(tái),并進(jìn)行內(nèi)部推廣。在這個(gè)過(guò)程中發(fā)現(xiàn)了一些比較有意思的需求場(chǎng)景,以及一些較為奇怪的 BUG。
基于 1.9 的 Flink SQL 擴(kuò)展
create table create view create function add resource
source: RocketMQ sink: RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics
在線的界面化 SQL 平臺(tái)

除了對(duì) Flink 本身功能的擴(kuò)展,字節(jié)內(nèi)部也上線了一個(gè) SQL 平臺(tái),支持以下功能:
SQL 編輯 SQL 解析 SQL 調(diào)試 自定義 UDF 和 Connector 版本控制 任務(wù)管理
二、實(shí)踐優(yōu)化
Window 性能優(yōu)化
-- my_window 為自定義的窗口,滿足特定的劃分方式SELECTroom_id,COUNT(DISTINCT user_id)FROM MySourceGROUP BYroom_id,my_window(ts, INTERVAL '1' HOURS)
SELECTroom_id,COUNT(DISTINCT user_id)FROM MySourceGROUP BYroom_id,TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)
維表優(yōu)化
所以用戶希望如果 Join 不到,則暫時(shí)將數(shù)據(jù)緩存起來(lái)之后再進(jìn)行嘗試,并且可以控制嘗試次數(shù),能夠自定義延遲 Join 的規(guī)則。這個(gè)需求場(chǎng)景不單單在字節(jié)內(nèi)部,社區(qū)的很多同學(xué)也有類似的需求。


當(dāng)作業(yè)并行度比較大,每一個(gè)維表 Join 的 subtask,訪問(wèn)的是所有的緩存空間,這樣對(duì)緩存來(lái)說(shuō)有很大的壓力。
廣播維表:有些場(chǎng)景下維表比較小,而且更新不頻繁,但作業(yè)的 QPS 特別高。如果依然訪問(wèn)外部系統(tǒng)進(jìn)行 Join,那么壓力會(huì)非常大。并且當(dāng)作業(yè) Failover 的時(shí)候 local cache 會(huì)全部失效,進(jìn)而又對(duì)外部系統(tǒng)造成很大訪問(wèn)壓力。那么改進(jìn)的方案是定期全量 scan 維表,通過(guò)Join key hash 的方式發(fā)送到下游,更新每個(gè)維表 subtask 的緩存。 Mini-Batch:主要針對(duì)一些 I/O 請(qǐng)求比較高,系統(tǒng)又支持 batch 請(qǐng)求的能力,比如說(shuō) RPC、HBase、Redis 等。以往的方式都是逐條的請(qǐng)求,且 Async I/O 只能解決 I/O 延遲的問(wèn)題,并不能解決訪問(wèn)量的問(wèn)題。通過(guò)實(shí)現(xiàn) Mini-Batch 版本的維表算子,大量降低維表關(guān)聯(lián)訪問(wèn)外部存儲(chǔ)次數(shù)。
Join 優(yōu)化

Interval Join 目前使用上的缺陷是它會(huì)產(chǎn)生一個(gè) out join 數(shù)據(jù)和 watermark 亂序的情況。 Regular Join 的話,它最大的缺陷是 retract 放大(之后會(huì)詳細(xì)說(shuō)明這個(gè)問(wèn)題)。 Temporal table function 的問(wèn)題較其它多一些,有三個(gè)問(wèn)題。
不支持 DDl 不支持 out join 的語(yǔ)義 (FLINK-7865 的限制) 右側(cè)數(shù)據(jù)斷流導(dǎo)致 watermark 不更新,下游無(wú)法正確計(jì)算 (FLINK-18934)
增強(qiáng) Checkpoint 恢復(fù)能力
第一點(diǎn):operate ID 是自動(dòng)生成的,然后因?yàn)槟承┰驅(qū)е滤傻?ID 改變了。
第二點(diǎn):算子的計(jì)算的邏輯發(fā)生了改變,即算子內(nèi)部的狀態(tài)的定義發(fā)生了變化。

下圖左上是正常的社區(qū)版的作業(yè)會(huì)產(chǎn)生的一個(gè)邏輯, source 和后面的并行度一樣的算子會(huì)被 chain 在一起,用戶是無(wú)法去改變的。但算子并行度是常會(huì)會(huì)發(fā)生修改,比如說(shuō) source 由原來(lái)的 100 修改為 50,cacl 的并發(fā)是 100。此時(shí) chain 的邏輯就會(huì)發(fā)生變化。


為了處理這種情況,支持了一種特殊的配置模式,允許用戶配置生成 operator ID 的時(shí)候可以忽略下游 chain 在一起算子數(shù)量的條件。

這導(dǎo)致了如新增或者減少指標(biāo),都會(huì)使原先的狀態(tài)沒(méi)辦法從 ValueState 中正常恢復(fù),因?yàn)?VauleState 中存儲(chǔ)的狀態(tài) “schema” 和新的(修改指標(biāo)后)的 “schema”不匹配,無(wú)法正常反序列化。


不兼容的另一種處理情況是允許返回一個(gè) migration(實(shí)現(xiàn)兩個(gè)不匹配類型的狀態(tài)恢復(fù))那么也可以恢復(fù)成功。
第一步使新舊 serializer 互相知道對(duì)方的信息,添加一個(gè)接口,且修改了 statebackend resolve compatibility 的過(guò)程,把舊的信息傳遞給新的,并使其獲取整個(gè) migrate 過(guò)程。 第二步判斷新老之間是否兼容,如果不兼容是否需要做一次 migration。然后讓舊的 serializer 去恢復(fù)一遍狀態(tài),并使用新的 serializer 寫(xiě)入新的狀態(tài)。 對(duì) aggregation 的代碼生成進(jìn)行處理,當(dāng)發(fā)現(xiàn) aggregation 拿到的是指標(biāo)是 null,那么將做一些初始化的工作。
三、流批一體探索
業(yè)務(wù)現(xiàn)狀

流批一體
數(shù)據(jù)不同源:批任務(wù)一般會(huì)有一次前置處理任務(wù),不管是離線的也好實(shí)時(shí)的也好,預(yù)先進(jìn)過(guò)一層加工后寫(xiě)入 Hive。而實(shí)時(shí)任務(wù)是從 kafka 讀取原始的數(shù)據(jù),可能是 json 格式,也可能是 avro 等等。直接導(dǎo)致批任務(wù)中可執(zhí)行的 SQL 在流任務(wù)中沒(méi)有結(jié)果生成或者執(zhí)行結(jié)果不對(duì)。 計(jì)算不同源:批任務(wù)一般是 Hive + Spark 的架構(gòu),而流任務(wù)基本都是基于 Flink。不同的執(zhí)行引擎在實(shí)現(xiàn)上都會(huì)有一些差異,導(dǎo)致結(jié)果不一致。不同的執(zhí)行引擎有不同的 API 定義 UDF,它們之間也是無(wú)法被公用的。大部分情況下都是維護(hù)兩套基于不同 API 實(shí)現(xiàn)的相同功能的 UDF。
數(shù)據(jù)不同源:流式處理先通過(guò) Flink 處理之后寫(xiě)入 MQ 供下游流式 Flink job 去消費(fèi),對(duì)于批式處理由 Flink 處理后流式寫(xiě)入到 Hive,再由批式的 Flink job 去處理。 引擎不同源:既然都是基于 Flink 開(kāi)發(fā)的流式,批式 job,自然沒(méi)有計(jì)算不同源問(wèn)題,同時(shí)也避免了維護(hù)多套相同功能的 UDF。

業(yè)務(wù)收益
統(tǒng)一的 SQL:通過(guò)一套 SQL 來(lái)表達(dá)流和批計(jì)算兩種場(chǎng)景,減少開(kāi)發(fā)維護(hù)工作。 復(fù)用 UDF:流式和批式計(jì)算可以共用一套 UDF。這對(duì)業(yè)務(wù)來(lái)說(shuō)是有積極意義的。 引擎統(tǒng)一:對(duì)于業(yè)務(wù)的學(xué)習(xí)成本和架構(gòu)的維護(hù)成本都會(huì)降低很多。 優(yōu)化統(tǒng)一:大部分的優(yōu)化都是可以同時(shí)作用在流式和批式計(jì)算上,比如對(duì) planner、operator 的優(yōu)化流和批可以共享。
四、未來(lái)工作和規(guī)劃
優(yōu)化 retract 放大問(wèn)題


將原先 retract 的兩條數(shù)據(jù)變成一條 changelog 的格式數(shù)據(jù),在算子之間傳遞。算子接收到 changelog 后處理變更,然后僅僅向下游發(fā)送一個(gè)變更 changelog 即可。

1.功能優(yōu)化
支持所有類型聚合指標(biāo)變更的 checkpoint 恢復(fù)能力
window local-global
事件時(shí)間的 Fast Emit
廣播維表
更多算子的 Mini-Batch 支持:維表,TopN,Join 等
全面兼容 Hive SQL 語(yǔ)法
2.業(yè)務(wù)擴(kuò)展
進(jìn)一步推動(dòng)流式 SQL 達(dá)到 80%
探索落地流批一體產(chǎn)品形態(tài)
推動(dòng)實(shí)時(shí)數(shù)倉(cāng)標(biāo)準(zhǔn)化
