Iceberg 實(shí)踐 | Flink + Iceberg + 對(duì)象存儲(chǔ),構(gòu)建數(shù)據(jù)湖方案
數(shù)據(jù)湖和 Iceberg 簡(jiǎn)介
未來(lái)規(guī)劃
演示方案 存儲(chǔ)優(yōu)化的一些思考
GitHub 地址 
一、數(shù)據(jù)湖和 Iceberg 簡(jiǎn)介
1. 數(shù)據(jù)湖生態(tài)

首先我們認(rèn)為它底下應(yīng)具備海量存儲(chǔ)的能力,常見(jiàn)的有對(duì)象存儲(chǔ),公有云存儲(chǔ)以及 HDFS;
在這之上,也需要支持豐富的數(shù)據(jù)類型,包括非結(jié)構(gòu)化的圖像視頻,半結(jié)構(gòu)化的 CSV、XML、Log,以及結(jié)構(gòu)化的數(shù)據(jù)庫(kù)表;
除此之外,需要高效統(tǒng)一的元數(shù)據(jù)管理,使得計(jì)算引擎可以方便地索引到各種類型數(shù)據(jù)來(lái)做分析。
最后,我們需要支持豐富的計(jì)算引擎,包括 Flink、Spark、Hive、Presto 等,從而方便對(duì)接企業(yè)中已有的一些應(yīng)用架構(gòu)。
2. 結(jié)構(gòu)化數(shù)據(jù)在數(shù)據(jù)湖上的應(yīng)用場(chǎng)景

首先,可以看到數(shù)據(jù)源的類型很多,因此需要支持比較豐富的數(shù)據(jù) Schema 的組織; 其次,它在注入的過(guò)程中要支撐實(shí)時(shí)的數(shù)據(jù)查詢,所以需要 ACID 的保證,確保不會(huì)讀到一些還沒(méi)寫完的中間狀態(tài)的臟數(shù)據(jù); 最后,例如日志這些有可能臨時(shí)需要改個(gè)格式,或者加一列。類似這種情況,需要避免像傳統(tǒng)的數(shù)倉(cāng)一樣,可能要把所有的數(shù)據(jù)重新提出來(lái)寫一遍,重新注入到存儲(chǔ);而是需要一個(gè)輕量級(jí)的解決方案來(lái)達(dá)成需求。
3. 結(jié)構(gòu)化數(shù)據(jù)在數(shù)據(jù)湖上的典型解決方案

4. Iceberg 表數(shù)據(jù)組織架構(gòu)

快照 Metadata:表格 Schema、Partition、Partition spec、Manifest List 路徑、當(dāng)前快照等。 Manifest List:Manifest File 路徑及其 Partition,數(shù)據(jù)文件統(tǒng)計(jì)信息。 Manifest File:Data File 路徑及其每列數(shù)據(jù)上下邊界。 Data File:實(shí)際表內(nèi)容數(shù)據(jù),以 Parque,ORC,Avro 等格式組織。
可以看到右邊從數(shù)據(jù)文件開始,數(shù)據(jù)文件存放表內(nèi)容數(shù)據(jù),一般支持 Parquet、ORC、Avro 等格式; 往上是 Manifest File,它會(huì)記錄底下數(shù)據(jù)文件的路徑以及每列數(shù)據(jù)的上下邊界,方便過(guò)濾查詢文件; 再往上是 Manifest List,它來(lái)鏈接底下多個(gè) Manifest File,同時(shí)記錄 Manifest File 對(duì)應(yīng)的分區(qū)范圍信息,也是為了方便后續(xù)做過(guò)濾查詢; Manifest List 其實(shí)已經(jīng)表示了快照的信息,它包含當(dāng)下數(shù)據(jù)庫(kù)表所有的數(shù)據(jù)鏈接,也是 Iceberg 能夠支持 ACID 特性的關(guān)鍵保障。 有了快照,讀數(shù)據(jù)的時(shí)候只能讀到快照所能引用到的數(shù)據(jù),還在寫的數(shù)據(jù)不會(huì)被快照引用到,也就不會(huì)讀到臟數(shù)據(jù)。多個(gè)快照會(huì)共享以前的數(shù)據(jù)文件,通過(guò)共享這些 Manifest File 來(lái)共享之前的數(shù)據(jù)。 再往上是快照元數(shù)據(jù),記錄了當(dāng)前或者歷史上表格 Scheme 的變化、分區(qū)的配置、所有快照 Manifest File 路徑、以及當(dāng)前快照是哪一個(gè)。 同時(shí),Iceberg 提供命名空間以及表格的抽象,做完整的數(shù)據(jù)組織管理。
5. Iceberg 寫入流程

首先,Data Workers 會(huì)從元數(shù)據(jù)上讀出數(shù)據(jù)進(jìn)行解析,然后把一條記錄交給 Iceberg 存儲(chǔ); 與常見(jiàn)的數(shù)據(jù)庫(kù)一樣,Iceberg 也會(huì)有預(yù)定義的分區(qū),那些記錄會(huì)寫入到各個(gè)不同的分區(qū),形成一些新的文件; Flink 有個(gè) CheckPoint 機(jī)制,文件到達(dá)以后,F(xiàn)link 就會(huì)完成這一批文件的寫入,然后生成這一批文件的清單,接著交給 Commit Worker; Commit Worker 會(huì)讀出當(dāng)前快照的信息,然后與這一次生成的文件列表進(jìn)行合并,生成一個(gè)新的 Manifest List 以及后續(xù)元數(shù)據(jù)的表文件的信息,之后進(jìn)行提交,成功以后就形成一個(gè)新的快照。
6. Iceberg 查詢流程

首先是 Flink Table scan worker 做一個(gè) scan,scan 的時(shí)候可以像樹一樣,從根開始,找到當(dāng)前的快照或者用戶指定的一個(gè)歷史快照,然后從快照中拿出當(dāng)前快照的 Manifest List 文件,根據(jù)當(dāng)時(shí)保存的一些信息,就可以過(guò)濾出滿足這次查詢條件的 Manifest File; 再往下經(jīng)過(guò) Manifest File 里記錄的信息,過(guò)濾出底下需要的 Data Files。這個(gè)文件拿出來(lái)以后,再交給 Recorder reader workers,它從文件中讀出滿足條件的 Recode,然后返回給上層調(diào)用。
7. Iceberg Catalog 功能一覽

它可以對(duì) Iceberg 定義一系列角色文件; 它的 File IO 都是可以定制,包括讀寫和刪除; 它的命名空間和表的操作 (也可稱為元數(shù)據(jù)操作),也可以定制; 包括表的讀取 / 掃描,表的提交,都可以用 Catalog 來(lái)定制。
二、對(duì)象存儲(chǔ)支撐 Iceberg 數(shù)據(jù)湖
1. 當(dāng)前 Iceberg Catalog 實(shí)現(xiàn)

2. 對(duì)象存儲(chǔ)和 HDFS 的比較

對(duì)象存儲(chǔ)在集群擴(kuò)展性,小文件友好,多站點(diǎn)部署和低存儲(chǔ)開銷上更加有優(yōu)勢(shì); HDFS 的好處就是提供追加上傳和原子性 rename,這兩個(gè)優(yōu)勢(shì)正是 Iceberg 需要的。

HDFS 架構(gòu)是用單個(gè) Name Node 保存所有元數(shù)據(jù),這就決定了它單節(jié)點(diǎn)的能力有限,所以在元數(shù)據(jù)方面沒(méi)有橫向擴(kuò)展能力。 對(duì)象存儲(chǔ)一般采用哈希方式,把元數(shù)據(jù)分隔成各個(gè)塊,把這個(gè)塊交給不同 Node 上面的服務(wù)來(lái)進(jìn)行管理,天然地它元數(shù)據(jù)的上限會(huì)更高,甚至在極端情況下可以進(jìn)行 rehash,把這個(gè)塊切得更細(xì),交給更多的 Node 來(lái)管理元數(shù)據(jù),達(dá)到擴(kuò)展能力。

HDFS 基于架構(gòu)的限制,小文件存儲(chǔ)受限于 Name Node 內(nèi)存等資源,雖然 HDFS 提供了 Archive 的方法來(lái)合并小文件,減少對(duì) Name Node 的壓力,但這需要額外增加復(fù)雜度,不是原生的。 同樣,小文件的 TPS 也是受限于 Name Node 的處理能力,因?yàn)樗挥袉蝹€(gè) Name Node。對(duì)象存儲(chǔ)的元數(shù)據(jù)是分布式存儲(chǔ)和管理,流量可以很好地分布到各個(gè) Node 上,這樣單節(jié)點(diǎn)就可以存儲(chǔ)海量的小文件。 目前,很多對(duì)象存儲(chǔ)提供多介質(zhì),分層加速,可以提升小文件的性能。

對(duì)象存儲(chǔ)支持多站點(diǎn)部署 全局命名空間 支持豐富的規(guī)則配置 對(duì)象存儲(chǔ)的多站點(diǎn)部署能力適用于兩地三中心多活的架構(gòu),而 HDFS 沒(méi)有原生的多站點(diǎn)部署能力。雖然目前看到一些商業(yè)版本給 HDFS 增加了多站點(diǎn)負(fù)責(zé)數(shù)據(jù)的能力,但由于它的兩個(gè)系統(tǒng)可能是獨(dú)立的,因此并不能支撐真正的全局命名空間下多活的能力。

對(duì)于存儲(chǔ)系統(tǒng)來(lái)說(shuō),為了適應(yīng)隨機(jī)的硬件故障,它一般會(huì)有副本機(jī)制來(lái)保護(hù)數(shù)據(jù)。 常見(jiàn)的如三副本,把數(shù)據(jù)存三份,然后分開保存到三個(gè) Node 上面,存儲(chǔ)開銷是三倍,但是它可以同時(shí)容忍兩個(gè)副本遇到故障,保證數(shù)據(jù)不會(huì)丟失。 另一種是 Erasure Coding,通常稱為 EC。以 10+2 舉例,它把數(shù)據(jù)切成 10 個(gè)數(shù)據(jù)塊,然后用算法算出兩個(gè)代碼塊,一共 12 個(gè)塊。接著分布到四個(gè)節(jié)點(diǎn)上,存儲(chǔ)開銷是 1.2 倍。它同樣可以容忍同時(shí)出現(xiàn)兩個(gè)塊故障,這種情況可以用剩余的 10 個(gè)塊算出所有的數(shù)據(jù),這樣減少存儲(chǔ)開銷,同時(shí)達(dá)到故障容忍程度。 HDFS 默認(rèn)使用三副本機(jī)制,新的 HDFS 版本上已經(jīng)支持 EC 的能力。經(jīng)過(guò)研究,它是基于文件做 EC,所以它對(duì)小文件有天然的劣勢(shì)。因?yàn)槿绻∥募拇笮⌒∮诜謮K要求的大小時(shí),它的開銷就會(huì)比原定的開銷更大,因?yàn)閮蓚€(gè)代碼塊這邊是不能省的。在極端情況下,如果它的大小等同于單個(gè)代碼塊的大小,它就已經(jīng)等同于三副本了。 同時(shí),HDFS 一旦 EC,就不能再支持 append、hflush、hsync 等操作,這會(huì)極大地影響 EC 能夠使用的場(chǎng)景。對(duì)象存儲(chǔ)原生支持 EC,對(duì)于小文件的話,它內(nèi)部會(huì)把小文件合并成一個(gè)大的塊來(lái)做 EC,這樣確保數(shù)據(jù)開銷方面始終是恒定的,基于預(yù)先配置的策略。
3. 對(duì)象存儲(chǔ)的挑戰(zhàn):數(shù)據(jù)的追加上傳



第一步先創(chuàng)建初始化的 MPU,拿到一個(gè) Upload ID,然后給每一個(gè)分段賦予一個(gè) Upload ID 以及一個(gè)編號(hào),這些分塊就可以并行上傳; 在上傳完成以后,還需要一步 Complete 操作,這樣相當(dāng)于通知系統(tǒng),它會(huì)把基于同一個(gè) Upload ID 以及所有的編號(hào),從小到大排起來(lái),組成一個(gè)大文件; 把機(jī)制運(yùn)用到數(shù)據(jù)追加上傳場(chǎng)景,常規(guī)實(shí)現(xiàn)就是寫入一個(gè)文件,把文件緩存到本地,當(dāng)達(dá)到分塊要求大小時(shí),就可以把它進(jìn)行初始化 MPU,把它的一個(gè)分塊開始上傳。后面每一個(gè)分塊也是一樣的操作,直到最后一個(gè)分塊上傳完,最后再調(diào)用一個(gè)完成操作來(lái)完成上傳。
缺點(diǎn)是 MPU 的分片數(shù)量有上限,S3 標(biāo)準(zhǔn)里可能只有 1 萬(wàn)個(gè)分片。想支持大文件的話,這個(gè)分塊就不能太小,所以對(duì)于小于分塊的文件,依然是要利用前面一種方法進(jìn)行緩存上傳; MPU 的優(yōu)點(diǎn)在于并行上傳的能力。假設(shè)做一個(gè)異步的上傳,文件在緩存達(dá)到以后,不用等上一個(gè)分塊上傳成功,就可以繼續(xù)緩存下一個(gè),之后開始上傳。當(dāng)前面注入的速度足夠快時(shí),后端的異步提交就變成了并行操作。利用這個(gè)機(jī)制,它可以提供比單條流上傳速度更快的上傳能力。
4. 對(duì)象存儲(chǔ)的挑戰(zhàn):原子提交


這里 Commit Worker 1 拿到了 v006 版本,然后合并自己的文件,提交 v007 成功。 此時(shí)還有另一個(gè) Commit Worker 2,它也拿到了 v006,然后合并出來(lái),且也要提供 v007。此時(shí)我們需要一個(gè)機(jī)制告訴它 v007 已經(jīng)沖突,不能上傳,然后讓它自己去 Retry。Retry 以后取出新的 v007 合并,然后提交給 v008。

首先,Commit Worker 1 拿到 v006,然后合并文件,在提交之前先要獲取這一把鎖,拿到鎖以后判斷當(dāng)前快照版本。如果是 v006,則 v007 能提交成功,提交成功以后再解鎖。 同樣,Commit Worker 2 拿到 v006 合并以后,它一開始拿不到鎖,要等 Commit Worker 1 釋放掉這個(gè)鎖以后才能拿到。等拿到鎖再去檢查的時(shí)候,會(huì)發(fā)現(xiàn)當(dāng)前版本已經(jīng)是 v007,與自己的 v007 有沖突,因此這個(gè)操作一定會(huì)失敗,然后它就會(huì)進(jìn)行 Retry。
5. Dell EMC ECS 的數(shù)據(jù)追加上傳

6. Dell EMC ECS 在并發(fā)提交下的解決方案

If-Match 就是說(shuō)在 Commit Worker 1 提交拿到 v006 的時(shí)候,同時(shí)拿到了文件的 eTag。提交的時(shí)候會(huì)帶上 eTag,系統(tǒng)需要判斷要覆蓋文件的 eTag 跟當(dāng)前這個(gè)文件真實(shí) eTag 是否相同,如果相同就允許這次覆蓋操作,那么 v007 就能提交成功; 另一種情況,是 Commit Worker 2 也拿到了 v006 的 eTag,然后上傳的時(shí)候發(fā)現(xiàn)拿到 eTag 跟當(dāng)前系統(tǒng)里文件不同,則會(huì)返回失敗,然后觸發(fā) Retry。
7. S3 Catalog - 統(tǒng)一存儲(chǔ)的數(shù)據(jù)

三、演示方案
四、存儲(chǔ)優(yōu)化的一些思考
另外~《Apache Flink-實(shí)時(shí)計(jì)算正當(dāng)時(shí)》電子書重磅發(fā)布,本書將助您輕松 Get Apache Flink 1.13 版本最新特征,同時(shí)還包含知名廠商多場(chǎng)景 Flink 實(shí)戰(zhàn)經(jīng)驗(yàn),學(xué)用一體,干貨多多!快掃描下方二維碼獲取吧~
(本次為搶鮮版,正式版將于 7 月初上線)

更多 Flink 相關(guān)技術(shù)交流,可掃碼加入社區(qū)釘釘大群~
▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 ▼
戳我,立即報(bào)名!

