一套基于 Flink 構(gòu)建數(shù)據(jù)集成平臺的設(shè)計與實現(xiàn)方案
摘要:數(shù)據(jù)倉庫有四個基本的特征:面向主題的、集成的、相對穩(wěn)定的、反映歷史變化的。其中數(shù)據(jù)集成是數(shù)據(jù)倉庫構(gòu)建的首要前提,指將多個分散的、異構(gòu)的數(shù)據(jù)源整合在一起以便于后續(xù)的數(shù)據(jù)分析。將數(shù)據(jù)集成過程平臺化,將極大提升數(shù)據(jù)開發(fā)人員的效率,本文主要內(nèi)容為:
數(shù)據(jù)集成 VS 數(shù)據(jù)同步 集成需求 數(shù)據(jù)集成 V1 數(shù)據(jù)集成 V2 線上效果 總結(jié)
A data warehouse is a subject-oriented, integrated, nonvolatile, and time-variant collection of data in support of management’s decisions.
—— Bill Inmon
一、數(shù)據(jù)集成 VS 數(shù)據(jù)同步
「數(shù)據(jù)集成」特指面向數(shù)據(jù)倉庫 ODS 層的數(shù)據(jù)同步過程; 「數(shù)據(jù)同步」面向的是一般化的 Source 到 Sink 的數(shù)據(jù)傳輸過程。

「數(shù)據(jù)同步平臺」提供基礎(chǔ)能力,不摻雜具體的業(yè)務(wù)邏輯。 「數(shù)據(jù)集成平臺」是構(gòu)建在「數(shù)據(jù)同步平臺」之上的,除了將原始數(shù)據(jù)同步之外還包含了一些聚合的邏輯 (如通過數(shù)據(jù)庫的日志數(shù)據(jù)對快照數(shù)據(jù)進(jìn)行恢復(fù),下文將會詳細(xì)展開) 以及數(shù)倉規(guī)范相關(guān)的內(nèi)容 (如數(shù)倉 ODS 層庫表命名規(guī)范) 等。
二、集成需求
目前伴魚內(nèi)部數(shù)據(jù)的集成需求主要體現(xiàn)在三塊:Stat Log (業(yè)務(wù)標(biāo)準(zhǔn)化日志或稱統(tǒng)計日志)、TiDB 及 MongoDB。除此之外還有一些 Service Log、Nginx Log 等,此類不具備代表性不在本文介紹。另外,由于實時數(shù)倉正處于建設(shè)過程中,目前「數(shù)據(jù)集成平臺」只涵蓋離線數(shù)倉 (Hive)。
Stat Log:業(yè)務(wù)落盤的日志將由 FileBeat 組件收集至 Kafka。由于日志為 Append Only 類型, 因此 Stat Log 集成相對簡單,只需將 Kafka 數(shù)據(jù)同步至 Hive 即可。 DB (TiDB、MongoDB):DB 數(shù)據(jù)相對麻煩,核心訴求是數(shù)倉中能夠存在業(yè)務(wù)數(shù)據(jù)庫的鏡像,即存在業(yè)務(wù)數(shù)據(jù)庫中某一時刻(天級 or 小時級)的數(shù)據(jù)快照,當(dāng)然有時也有對數(shù)據(jù)變更過程的分析需求。因此 DB 數(shù)據(jù)集成需要將這兩個方面都考慮進(jìn)去。
三、數(shù)據(jù)集成 V1
3.1 Stat Log

3.2 DB


四、數(shù)據(jù)集成 V2
4.1 Stat Log

checkpoint: 10 min watermark: 1 min partition.time-extractor.kind: ‘custom’ sink.partition-commit.delay: ‘3600s’ sink.partition-commit.policy.kind: ‘metastore,success-file’ sink.partition-commit.trigger: ‘partition-time’
4.2 DB

用戶提交集成任務(wù)后將同步創(chuàng)建三個任務(wù):
增量任務(wù) (流):「增量任務(wù)」將 DB 日志數(shù)據(jù)由 Kafka 同步至 Hive。由于采集組件都是按照集群粒度進(jìn)行采集,且集群數(shù)量有限,目前都是手動的方式將同步的任務(wù)在「實時計算平臺」創(chuàng)建,集成任務(wù)創(chuàng)建時默認(rèn)假定同步任務(wù)已經(jīng) ready,待「數(shù)據(jù)同步平臺」落地后可以同步做更多的自動化操作和校驗。 存量任務(wù) (批):要想還原出快照數(shù)據(jù)則至少需要一份初始的快照數(shù)據(jù),因此「存量任務(wù)」的目的是從業(yè)務(wù)數(shù)據(jù)庫拉取集成時數(shù)據(jù)的初始快照數(shù)據(jù)。 Merge 任務(wù) (批):「Merge 任務(wù)」將存量數(shù)據(jù)和增量數(shù)據(jù)進(jìn)行聚合以還原快照數(shù)據(jù)。還原后的快照數(shù)據(jù)可作為下一日的存量,因此「存量任務(wù)」只需調(diào)度執(zhí)行一次,獲取初始快照數(shù)據(jù)即可。

DB 的數(shù)據(jù)集成相較于 Stat Log 復(fù)雜性高,接下來以 TiDB 的數(shù)據(jù)集成為例講述設(shè)計過程中的一些要點 (MongoDB 流程類似,區(qū)別在于存量同步工具及數(shù)據(jù)解析)。
■?4.2.1 需求表達(dá)
TiDB 源信息:包括集群、庫、表。 集成方式:集成方式表示的是快照數(shù)據(jù)的聚合粒度,包括全量和增量。全量表示需要將存量的快照數(shù)據(jù)與今日的增量日志數(shù)據(jù)聚合,而增量表示只需要將今日的增量日志數(shù)據(jù)聚合 (即便增量方式無需和存量的快照數(shù)據(jù)聚合,但初始存量的獲取依舊是有必要的,具體的使用形式由數(shù)倉人員自行決定)。
■?4.2.2 存量任務(wù)


同步觸發(fā)數(shù)據(jù)庫平臺進(jìn)行備份恢復(fù),產(chǎn)生回執(zhí) ID; 通過回執(zhí) ID 輪訓(xùn)備份恢復(fù)狀態(tài),恢復(fù)失敗需要 DBA 定位異常,故將下線整個工作流,待恢復(fù)成功可在平臺重新恢復(fù)執(zhí)行「存量任務(wù)」。恢復(fù)進(jìn)行中,工作流直接退出,借助 DS 定時調(diào)度等待下次喚醒?;謴?fù)成功,進(jìn)入后續(xù)邏輯; 從恢復(fù)庫中拉取存量,判定存量是否存在數(shù)據(jù)差,若存在則執(zhí)行 Merge 任務(wù)的補數(shù)操作,整個操作可冪等執(zhí)行,如若失敗退出此次工作流,等待下次調(diào)度; 成功,下線整個工作流,任務(wù)完成。
■?4.2.3 Merge 任務(wù)

校驗文件標(biāo)記是否存在,若不存在說明數(shù)據(jù)未 ready ,進(jìn)行報警并退出工作流等待下次調(diào)度; 執(zhí)行 Merge 操作,失敗報警并退出工作流等待下次調(diào)度; 成功,退出工作流等待下次調(diào)度。
加載存量、增量數(shù)據(jù),統(tǒng)一數(shù)據(jù)格式(核心字段:主鍵 Key 作為同一條數(shù)據(jù)的聚合字段;CommitTs 標(biāo)識 binlog 的提交時間,存量數(shù)據(jù)默認(rèn)為 0 早于增量數(shù)據(jù);OpType 標(biāo)識數(shù)據(jù)操作類型,包括:Insert、Update、Delete,存量數(shù)據(jù)默認(rèn)為 Insert 類型),將兩份數(shù)據(jù)進(jìn)行 union; 按照主鍵聚合; 保留聚合后 CommitTs 最大的數(shù)據(jù)條目,其余丟棄; 過濾 OpType 為 Delete 類型的數(shù)據(jù)條目; 輸出聚合結(jié)果。
allMergedData.groupBy(x -> x.getKeyCols()).reduce(new ReduceFunction() { public MergeTransform reduce(MergeTransform value1, MergeTransform value2) throws Exception {if (value1.getCommitTS() > value2.getCommitTS()){return value1;}return value2;}}).filter(new FilterFunction() { //增量:過濾掉 op=delete public boolean filter(MergeTransform merge) throws Exception {if (merge.getOpType().equals(OPType.DELETE)){return false;}return true;}}).map(x -> x.getHiveColsText()).writeAsText(outPath);
■?4.2.4 容錯性與數(shù)據(jù)一致性保證
我們大體可以從三個任務(wù)故障場景下的處理方式來驗證方案的容錯性。
「存量任務(wù)」異常失敗:通常是備份恢復(fù)失敗導(dǎo)致,DS 任務(wù)將發(fā)送失敗報警,因「數(shù)據(jù)庫平臺」暫不支持恢復(fù)重試,需人工介入處理。同時「Merge 任務(wù)」檢測不到存量的 _SUCCESS 標(biāo)記,工作流不會向后推進(jìn)。 「增量任務(wù)」異常失?。?/span>Flink 自身的容錯機制以及「實時計算平臺」的外部檢測機制保障「增量任務(wù)」的容錯性。若在「Merge 任務(wù)」調(diào)度執(zhí)行期間「增量任務(wù)」尚未恢復(fù),將誤以為該小時無增量數(shù)據(jù)跳過執(zhí)行,此時相當(dāng)于快照更新延遲(Merge 是將全天的增量數(shù)據(jù)與存量聚合,在之后的調(diào)度時間點如果「增量任務(wù)」恢復(fù)又可以聚合得到最新的快照),或者在「增量任務(wù)」恢復(fù)后可人為觸發(fā)「Merge 任務(wù)」補數(shù)。 「Merge 任務(wù)」異常失?。?/span>任務(wù)具有冪等性,通過設(shè)置 DS 任務(wù)失敗后的重試機制保障容錯性,同時發(fā)送失敗報警。
數(shù)據(jù)的一致性體現(xiàn)在 Merge 操作。兩份數(shù)據(jù)聚合,從代碼層面一定可以確保算法的正確性 (這是可驗證的、可測試的),那么唯一可能導(dǎo)致數(shù)據(jù)不一致的情況出現(xiàn)在兩份輸入的數(shù)據(jù)上,即存量和增量,存在兩種情況:
存量和增量數(shù)據(jù)有交疊:體現(xiàn)在初始存量與整點的增量數(shù)據(jù)聚合場景,由于算法天然的去重性可以保證數(shù)據(jù)的一致。 存量和增量數(shù)據(jù)有缺失:體現(xiàn)在增量數(shù)據(jù)的缺失上,而增量數(shù)據(jù)是由 Flink 將 Kafka 數(shù)據(jù)寫入 Hive 的,這個過程中是有一定的可能性造成數(shù)據(jù)的不一致,即分區(qū)提交后的亂序數(shù)據(jù)。雖然說亂序數(shù)據(jù)到來后的下一次 checkpoint 時間點分區(qū)將再次提交,但下游任務(wù)一般是檢測到首次分區(qū)提交就會觸發(fā)執(zhí)行,造成下游任務(wù)的數(shù)據(jù)不一致。
針對 Flink 流式寫 Hive 過程中的亂序數(shù)據(jù)處理可以采取兩種手段:
一是 Kafka 設(shè)置單分區(qū),多分區(qū)是產(chǎn)生導(dǎo)致亂序的根因,通過避免多分區(qū)消除數(shù)據(jù)亂序。 二是報警補償,亂序一旦產(chǎn)生流式任務(wù)是無法完全避免的 (可通過 watermark 設(shè)置亂序容忍時間,但終有一個界限),那么只能通過報警做事后補償。
五、線上效果



六、總結(jié)
