Iceberg 數(shù)據(jù)湖 CDC 數(shù)據(jù)實(shí)時(shí)讀寫(xiě)方案及原理
摘要:本文由李勁松、胡爭(zhēng)分享,社區(qū)志愿者楊偉海、李培殿整理。主要介紹在數(shù)據(jù)湖的架構(gòu)中,CDC 數(shù)據(jù)實(shí)時(shí)讀寫(xiě)的方案和原理。文章主要分為 4 個(gè)部分內(nèi)容:
常見(jiàn)的 CDC 分析方案 為何選擇 Flink + Iceberg 如何實(shí)時(shí)寫(xiě)入讀取 未來(lái)規(guī)劃
一、常見(jiàn)的 CDC 分析方案
一、常見(jiàn)的 CDC 分析方案
1.1 離線 HBase 集群分析 CDC 數(shù)據(jù)

1.2 Apache Kudu 維護(hù) CDC 數(shù)據(jù)集

1.3 直接導(dǎo)入 CDC 到 Hive 分析
第三種方案,也是大家在數(shù)倉(cāng)中比較常用的方案,就是把 MySQL 的數(shù)據(jù)寫(xiě)到 Hive,流程是:維護(hù)一個(gè)全量的分區(qū),然后每天做一個(gè)增量的分區(qū),最后把增量分區(qū)寫(xiě)好之后進(jìn)行一次 Merge ,寫(xiě)入一個(gè)新的分區(qū),流程上這樣是走得通的。Hive 之前的全量分區(qū)是不受增量的影響的,只有當(dāng)增量 Merge 成功之后,分區(qū)才可查,才是一個(gè)全新的數(shù)據(jù)。這種純列存的 append 的數(shù)據(jù)對(duì)于分析是非常友好的。

1.4 Spark + Delta 分析 CDC 數(shù)據(jù)
針對(duì)這個(gè)問(wèn)題,Spark + Delta 在分析 CDC 數(shù)據(jù)的時(shí)候提供了 MERGE INTO 的語(yǔ)法。這并不僅僅是對(duì) Hive 數(shù)倉(cāng)的語(yǔ)法簡(jiǎn)化,Spark + Delta 作為新型數(shù)據(jù)湖的架構(gòu)(例如 Iceberg、Hudi),它對(duì)數(shù)據(jù)的管理不是分區(qū),而是文件,因此 Delta 優(yōu)化 MERGE INTO 語(yǔ)法,僅掃描和重寫(xiě)發(fā)生變化的文件即可,因此高效很多。

我們?cè)u(píng)估一下這個(gè)方案,他的優(yōu)點(diǎn)是僅依賴 Spark + Delta 架構(gòu)簡(jiǎn)潔、沒(méi)有在線服務(wù)、列存,分析速度非???。優(yōu)化之后的 MERGE INTO 語(yǔ)法速度也夠快。
這個(gè)方案,業(yè)務(wù)上是一個(gè) Copy On Write 的一個(gè)方案,它只需要 copy 少量的文件,可以讓延遲做的相對(duì)低。理論上,在更新的數(shù)據(jù)跟現(xiàn)有的存量沒(méi)有很大重疊的話,可以把天級(jí)別的延遲做到小時(shí)級(jí)別的延遲,性能也是可以跟得上的。

這個(gè)方案在 Hive 倉(cāng)庫(kù)處理 upsert 數(shù)據(jù)的路上已經(jīng)前進(jìn)了一小步了。但小時(shí)級(jí)別的延遲畢竟不如實(shí)時(shí)更有效,因此這個(gè)方案最大的缺點(diǎn)在 Copy On Write 的 Merge 有一定的開(kāi)銷(xiāo),延遲不能做的太低。
第一部分大概現(xiàn)有的方案就是這么多,同時(shí)還需要再?gòu)?qiáng)調(diào)一下,upsert 之所以如此重要,是因?yàn)樵跀?shù)據(jù)湖的方案中,upsert 是實(shí)現(xiàn)數(shù)據(jù)庫(kù)準(zhǔn)實(shí)時(shí)、實(shí)時(shí)入湖的一個(gè)關(guān)鍵技術(shù)點(diǎn)。
二、為何選擇 Flink + Iceberg
二、為何選擇 Flink + Iceberg
2.1 Flink 對(duì) CDC 數(shù)據(jù)消費(fèi)的支持
第一,F(xiàn)link 原生支持 CDC 數(shù)據(jù)消費(fèi)。在前文 Spark + Delta 的方案中,MARGE INTO 的語(yǔ)法,用戶需要感知 CDC 的屬性概念,然后寫(xiě)到 merge 的語(yǔ)法上來(lái)。但是 Flink 是原生支持 CDC 數(shù)據(jù)的。用戶只要聲明一個(gè) Debezium 或者其他 CDC 的 format,F(xiàn)link 上面的 SQL 是不需要感知任何 CDC 或者 upsert 的屬性的。Flink 中內(nèi)置了 hidden column 來(lái)標(biāo)識(shí)它 CDC 的類型數(shù)據(jù),所以對(duì)用戶而言比較簡(jiǎn)潔。


2.2 Flink 對(duì) Change Log Stream 的支持

2.3 Flink + Iceberg CDC 導(dǎo)入方案評(píng)估
最后,F(xiàn)link + Iceberg 的 CDC 導(dǎo)入方案的優(yōu)點(diǎn)是什么?
對(duì)比之前的方案,Copy On Write 跟 Merge On Read 都有適用的場(chǎng)景,側(cè)重點(diǎn)不同。Copy On Write 在更新部分文件的場(chǎng)景中,當(dāng)只需要重寫(xiě)其中的一部分文件時(shí)是很高效的,產(chǎn)生的數(shù)據(jù)是純 append 的全量數(shù)據(jù)集,在用于數(shù)據(jù)分析的時(shí)候也是最快的,這是 Copy On Write 的優(yōu)勢(shì)。
另外一個(gè)是 Merge On Read,即將數(shù)據(jù)連同 CDC flag 直接 append 到 Iceberg 當(dāng)中,在 merge 的時(shí)候,把這些增量的數(shù)據(jù)按照一定的組織格式、一定高效的計(jì)算方式與全量的上一次數(shù)據(jù)進(jìn)行一次 merge。這樣的好處是支持近實(shí)時(shí)的導(dǎo)入和實(shí)時(shí)數(shù)據(jù)讀?。贿@套計(jì)算方案的 Flink SQL 原生支持 CDC 的攝入,不需要額外的業(yè)務(wù)字段設(shè)計(jì)。
Iceberg 是統(tǒng)一的數(shù)據(jù)湖存儲(chǔ),支持多樣化的計(jì)算模型,也支持各種引擎(包括 Spark、Presto、hive)來(lái)進(jìn)行分析;產(chǎn)生的 file 都是純列存的,對(duì)于后面的分析是非??斓模籌ceberg 作為數(shù)據(jù)湖基于 snapshot 的設(shè)計(jì),支持增量讀?。籌ceberg 架構(gòu)足夠簡(jiǎn)潔,沒(méi)有在線服務(wù)節(jié)點(diǎn),純 table format 的,這給了上游平臺(tái)方足夠的能力來(lái)定制自己的邏輯和服務(wù)化。

三、如何實(shí)時(shí)寫(xiě)入讀取
三、如何實(shí)時(shí)寫(xiě)入讀取
3.1 批量更新場(chǎng)景和 CDC 寫(xiě)入場(chǎng)景
首先我們來(lái)了解一下在整個(gè)數(shù)據(jù)湖里面批量更新的兩個(gè)場(chǎng)景。
第一批量更新的這種場(chǎng)景,在這個(gè)場(chǎng)景中我們使用一個(gè) SQL 更新了成千上萬(wàn)行的數(shù)據(jù),比如歐洲的 GDPR 策略,當(dāng)一個(gè)用戶注銷(xiāo)掉自己的賬戶之后,后臺(tái)的系統(tǒng)是必須將這個(gè)用戶所有相關(guān)的數(shù)據(jù)全部物理刪除。
第二個(gè)場(chǎng)景是我們需要將 date lake 中一些擁有共同特性的數(shù)據(jù)刪除掉,這個(gè)場(chǎng)景也是屬于批量更新的一個(gè)場(chǎng)景,在這個(gè)場(chǎng)景中刪除的條件可能是任意的條件,跟主鍵(Primary key)沒(méi)有任何關(guān)系,同時(shí)這個(gè)待更新的數(shù)據(jù)集是非常大,這種作業(yè)是一個(gè)長(zhǎng)耗時(shí)低頻次的作業(yè)。
另外是 CDC 寫(xiě)入的場(chǎng)景,對(duì)于對(duì) Flink 來(lái)說(shuō),一般常用的有兩種場(chǎng)景,第一種場(chǎng)景是上游的 Binlog 能夠很快速的寫(xiě)到 data lake 中,然后供不同的分析引擎做分析使用; 第二種場(chǎng)景是使用 Flink 做一些聚合操作,輸出的流是 upsert 類型的數(shù)據(jù)流,也需要能夠?qū)崟r(shí)的寫(xiě)到數(shù)據(jù)湖或者是下游系統(tǒng)中去做分析。如下圖示例中 CDC 寫(xiě)入場(chǎng)景中的 SQL 語(yǔ)句,我們使用單條 SQL 更新一行數(shù)據(jù),這種計(jì)算模式是一種流式增量的導(dǎo)入,而且屬于高頻的更新。

3.2 Apache Iceberg 設(shè)計(jì) CDC 寫(xiě)入方案需要考慮的問(wèn)題
接下來(lái)我們看下 iceberg 對(duì)于 CDC 寫(xiě)入這種場(chǎng)景在方案設(shè)計(jì)時(shí)需要考慮哪些問(wèn)題。
第一是正確性,即需要保證語(yǔ)義及數(shù)據(jù)的正確性,如上游數(shù)據(jù) upsert 到 iceberg 中,當(dāng)上游 upsert 停止后, iceberg 中的數(shù)據(jù)需要和上游系統(tǒng)中的數(shù)據(jù)保持一致。
第二是高效寫(xiě)入,由于 upsert 的寫(xiě)入頻率非常高,我們需要保持高吞吐、高并發(fā)的寫(xiě)入。
第三是快速讀取,當(dāng)數(shù)據(jù)寫(xiě)入后我們需要對(duì)數(shù)據(jù)進(jìn)行分析,這其中涉及到兩個(gè)問(wèn)題,第一個(gè)問(wèn)題是需要支持細(xì)粒度的并發(fā),當(dāng)作業(yè)使用多個(gè) task 來(lái)讀取時(shí)可以保證為各個(gè) task 進(jìn)行均衡的分配以此來(lái)加速數(shù)據(jù)的計(jì)算;第二個(gè)問(wèn)題是我們要充分發(fā)揮列式存儲(chǔ)的優(yōu)勢(shì)來(lái)加速讀取。
第四是支持增量讀,例如一些傳統(tǒng)數(shù)倉(cāng)中的 ETL,通過(guò)增量讀取來(lái)進(jìn)行進(jìn)一步數(shù)據(jù)轉(zhuǎn)換。

3.3 Apache Iceberg Basic

下圖展示了在 iceberg 中 snapshot、manifest 及 partition 中的文件的對(duì)應(yīng)關(guān)系。下圖中包含了三個(gè) partition,第一個(gè) partition 中有兩個(gè)文件 f1、f3,第二個(gè) partition 有兩個(gè)文件f4、f5,第三個(gè) partition 有一個(gè)文件f2。對(duì)于每一次寫(xiě)入都會(huì)生成一個(gè) manifest 文件,該文件記錄本次寫(xiě)入的文件與 partition 的對(duì)應(yīng)關(guān)系。再向上層有 snapshot 的概念,snapshot 能夠幫助快速訪問(wèn)到整張表的全量數(shù)據(jù),snapshot 記錄多個(gè) manifest,如第二個(gè) snapshot 包含 manifest2 和 manifest3。

3.4 INSERT、UPDATE、DELETE 寫(xiě)入
在了解了基本的概念,下面介紹 iceberg 中 insert、update、delete 操作的設(shè)計(jì)。
下圖示例的 SQL 中展示的表包含兩個(gè)字段即 id、data,兩個(gè)字段都是 int 類型。在一個(gè) transaction 中我們進(jìn)行了圖示中的數(shù)據(jù)流操作,首先插入了(1,2)一條記錄,接下來(lái)將這條記錄更新為(1,3),在 iceberg 中 update 操作將會(huì)拆為 delete 和 insert 兩個(gè)操作。
這么做的原因是考慮到 iceberg 作為流批統(tǒng)一的存儲(chǔ)層,將 update 操作拆解為 delete 和 insert 操作可以保證流批場(chǎng)景做更新時(shí)讀取路徑的統(tǒng)一,如在批量刪除的場(chǎng)景下以 Hive 為例,Hive 會(huì)將待刪除的行的文件 offset 寫(xiě)入到 delta 文件中,然后做一次 merge on read,因?yàn)檫@樣會(huì)比較快,在 merge 時(shí)通過(guò) position 將原文件和 delta 進(jìn)行映射,將會(huì)很快得到所有未刪除的記錄。
接下來(lái)又插入記錄(3,5),刪除了記錄(1,3),插入記錄(2,5),最終查詢是我們得到記錄(3,5)(2,5)。

上面操作看上去非常簡(jiǎn)單,但在實(shí)現(xiàn)中是存在一些語(yǔ)義上的問(wèn)題。如下圖中,在一個(gè) transaction 中首先執(zhí)行插入記錄(1,2)的操作,該操作會(huì)在 data file1 文件中寫(xiě)入 INSERT(1,2),然后執(zhí)行刪除記錄(1,2)操作,該操作會(huì)在 equalify delete file1 中寫(xiě)入 DELETE(1,2),接著又執(zhí)行插入記錄(1,2)操作,該操作會(huì)在 data file1 文件中再寫(xiě)入INSERT(1,2),然后執(zhí)行查詢操作。
在正常情況下查詢結(jié)果應(yīng)該返回記錄 INSERT(1,2),但在實(shí)現(xiàn)中,DELETE(1,2)操作無(wú)法得知?jiǎng)h除的是 data file1 文件中的哪一行,因此兩行 INSERT(1,2)記錄都將被刪除。

那么如何來(lái)解決這個(gè)問(wèn)題呢,社區(qū)當(dāng)前的方式是采用了 Mixed position-delete and equality-delete。Equality-delete 即通過(guò)指定一列或多列來(lái)進(jìn)行刪除操作,position-delete 是根據(jù)文件路徑和行號(hào)來(lái)進(jìn)行刪除操作,通過(guò)將這兩種方法結(jié)合起來(lái)以保證刪除操作的正確性。
如下圖我們?cè)诘谝粋€(gè) transaction 中插入了三行記錄,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),然后執(zhí)行 commit 操作進(jìn)行提交。接下來(lái)我們開(kāi)啟一個(gè)新的 transaction 并執(zhí)行插入一行數(shù)據(jù)(1,5),由于是新的 transaction,因此新建了一個(gè) data file2 并寫(xiě)入 INSERT(1,5)記錄,接下來(lái)執(zhí)行刪除記錄(1,5),實(shí)際寫(xiě)入 delete 時(shí)是:
在 position delete file1 文件寫(xiě)入(file2, 0),表示刪除 data file2 中第 0 行的記錄,這是為了解決同一個(gè) transaction 內(nèi)同一行數(shù)據(jù)反復(fù)插入刪除的語(yǔ)義的問(wèn)題。
在 equality delete file1 文件中寫(xiě)入 DELETE (1,5),之所以寫(xiě)入這個(gè) delete 是為了確保本次 txn 之前寫(xiě)入的 (1,5) 能被正確刪除。
然后執(zhí)行刪除(1,4)操作,由于(1,4)在當(dāng)前 transaction 中未曾插入過(guò),因此該操作會(huì)使用 equality-delete 操作,即在 equality delete file1 中寫(xiě)入(1,4)記錄。在上述流程中可以看出在當(dāng)前方案中存在 data file、position delete file、equality delete file 三類文件。

在了解了寫(xiě)入流程后,如何來(lái)讀取呢。如下圖所示,對(duì)于 position delete file 中的記錄(file2, 0)只需和當(dāng)前 transaction 的 data file 進(jìn)行 join 操作,對(duì)于 equality delete file 記錄(1,4)和之前的 transaction 中的 data file 進(jìn)行 join 操作。最終得到記錄 INSERT(1,3)、INSERT(1,2)保證了流程的正確性。

3.5 Manifest 文件的設(shè)計(jì)
上面介紹了 insert、update 及 delete,但在設(shè)計(jì) task 的執(zhí)行計(jì)劃時(shí)我們對(duì) manifest 進(jìn)行了一些設(shè)計(jì),目的是通過(guò) manifest 能夠快速到找到 data file,并按照數(shù)據(jù)大小進(jìn)行分割,保證每個(gè) task 處理的數(shù)據(jù)盡可能的均勻分布。
如下圖示例,包含四個(gè) transaction,前兩個(gè) transaction 是 INSERT 操作,對(duì)應(yīng) M1、M2,第三個(gè) transaction 是 DELETE 操作,對(duì)應(yīng) M3,第四個(gè) transaction 是 UPDATE 操作,包含兩個(gè) manifest 文件即 data manifest 和 delete manifest。

對(duì)于為什么要對(duì) manifest 文件拆分為 data manifest 和 delete manifest 呢,本質(zhì)上是為了快速為每個(gè) data file 找到對(duì)應(yīng)的 delete file 列表。可以看下圖示例,當(dāng)我們?cè)?partition-2 做讀取時(shí),需要將 deletefile-4 與datafile-2、datafile-3 做一個(gè) join 操作,同樣也需要將 deletefile-5 與 datafile-2、datafile-3 做一個(gè) join 操作。
以 datafile-3 為例,deletefile 列表包含 deletefile-4 和 deletefile-5 兩個(gè)文件,如何快速找到對(duì)應(yīng)的 deletefIle 列表呢,我們可以根據(jù)上層的 manifest 來(lái)進(jìn)行查詢,當(dāng)我們將 manifest 文件拆分為 data manifest 和 delete manifest 后,可以將 M2(data manifest)與 M3、M4(delete manifest)先進(jìn)行一次 join 操作,這樣便可以快速的得到 data file 所對(duì)應(yīng)的 delete file 列表。

3.6 文件級(jí)別的并發(fā)
另一個(gè)問(wèn)題是我們需要保證足夠高的并發(fā)讀取,在 iceberg 中這點(diǎn)做得非常出色。在 iceberg 中可以做到文件級(jí)別的并發(fā)讀取,甚至文件中更細(xì)粒度的分段的并發(fā)讀取,比如文件有 256MB,可以分為兩個(gè) 128MB 進(jìn)行并發(fā)讀取。這里舉例說(shuō)明,假設(shè) insert 文件跟 delete 文件在兩個(gè) Bucket 中的布局方式如下圖所示。


第一點(diǎn),如果同一個(gè) task 內(nèi)的 delete file 有重復(fù)可以做緩存處理,這樣可以提高 join 的效率。 第二點(diǎn),當(dāng) delete file 比較大需要溢寫(xiě)到磁盤(pán)時(shí)可以使用 kv lib 來(lái)做優(yōu)化,但這不依賴外部服務(wù)或其他繁重的索引。 第三點(diǎn),可以設(shè)計(jì) Bloom filter(布隆過(guò)濾器)來(lái)過(guò)濾無(wú)效的 IO,因?yàn)閷?duì)于 Flink 中常用的 upsert 操作會(huì)產(chǎn)生一個(gè) delete 操作和一個(gè) insert 操作,這會(huì)導(dǎo)致在 iceberg 中 data file 和 delete file 大小相差不大,這樣 join 的效率不會(huì)很高。如果采用 Bloom Filter,當(dāng) upsert 數(shù)據(jù)到來(lái)時(shí),拆分為 insert 和 delete 操作,如果通過(guò) bloom filter 過(guò)濾掉那些之前沒(méi)有 insert 過(guò)數(shù)據(jù)的 delete 操作(即如果這條數(shù)據(jù)之前沒(méi)有插入過(guò),則不需要將 delete 記錄寫(xiě)入到 delete file 中),這將極大的提高 upsert 的效率。 第四點(diǎn),是需要一些后臺(tái)的 compaction 策略來(lái)控制 delete file 文件大小,當(dāng) delete file 越少,分析的效率越高,當(dāng)然這些策略并不會(huì)影響正常的讀寫(xiě)。

3.7 增量文件集的 Transaction 提交
前面介紹了文件的寫(xiě)入,下圖我們介紹如何按照 iceberg 的語(yǔ)義進(jìn)行寫(xiě)入并且供用戶讀取。主要分為數(shù)據(jù)和 metastore 兩部分,首先會(huì)有 IcebergStreamWriter 進(jìn)行數(shù)據(jù)的寫(xiě)入,但此時(shí)寫(xiě)入數(shù)據(jù)的元數(shù)據(jù)信息并沒(méi)有寫(xiě)入到 metastore,因此對(duì)外不可見(jiàn)。第二個(gè)算子是 IcebergFileCommitter,該算子會(huì)將數(shù)據(jù)文件進(jìn)行收集, 最終通過(guò) commit transaction 來(lái)完成寫(xiě)入。
在 Iceberg 中并沒(méi)有其他任何其他第三方服務(wù)的依賴,而 Hudi 在某些方面做了一些 service 的抽象,如將 metastore 抽象為獨(dú)立的 Timeline,這可能會(huì)依賴一些獨(dú)立的索引甚至是其他的外部服務(wù)來(lái)完成。

四、未來(lái)規(guī)劃

下面是我們未來(lái)的一些規(guī)劃,首先是 Iceberg 內(nèi)核的一些優(yōu)化,包括方案中涉及到的全鏈路穩(wěn)定性測(cè)試及性能的優(yōu)化, 并提供一些 CDC 增量拉取的相關(guān) Table API 接口。
在 Flink 集成上,會(huì)實(shí)現(xiàn) CDC 數(shù)據(jù)的自動(dòng)和手動(dòng)合并數(shù)據(jù)文件的能力,并提供 Flink 增量拉取 CDC 數(shù)據(jù)的能力。
在其他生態(tài)集成上,我們會(huì)對(duì) Spark、Presto 等引擎進(jìn)行集成,并借助 Alluxio 加速數(shù)據(jù)查詢。
今天的分享就到這里,謝謝大家。
