1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Iceberg 實踐 | 汽車之家:基于 Flink + Iceberg 的湖倉一體架構(gòu)實踐

        共 5994字,需瀏覽 12分鐘

         ·

        2021-06-12 01:02

        摘要:由汽車之家實時計算平臺負(fù)責(zé)人邸星星在 4 月 17 日上海站 Meetup 分享的,基于 Flink + Iceberg 的湖倉一體架構(gòu)實踐,內(nèi)容包括:


        1. 數(shù)據(jù)倉庫架構(gòu)升級的背景

        2. 基于 Iceberg 的湖倉一體架構(gòu)實踐

        3. 總結(jié)與收益

        4. 后續(xù)規(guī)劃


        Tips:點擊文閱讀原文即可查看原文視頻~

         GitHub 地址 
        https://github.com/apache/flink
        歡迎大家給 Flink 點贊送 star~

        一、數(shù)據(jù)倉庫架構(gòu)升級的背景


        1. 基于 Hive 的數(shù)據(jù)倉庫的痛點


        原有的數(shù)據(jù)倉庫完全基于 Hive 建造而成,主要存在三大痛點:


        痛點一:不支持 ACID


        1)不支持 Upsert 場景;


        2)不支持 Row-level delete,數(shù)據(jù)修正成本高。


        痛點二:時效性難以提升


        1)數(shù)據(jù)難以做到準(zhǔn)實時可見;


        2)無法增量讀取,無法實現(xiàn)存儲層面的流批統(tǒng)一;


        3)無法支持分鐘級延遲的數(shù)據(jù)分析場景。


        痛點三:Table Evolution


        1)寫入型 Schema,對 Schema 變更支持不好;


        2)Partition Spec 變更支持不友好。


        2. Iceberg 關(guān)鍵特性


        Iceberg 主要有四大關(guān)鍵特性:支持 ACID 語義、增量快照機制、開放的表格式和流批接口支持。


        • 支持 ACID 語義


          • 不會讀到不完整的 Commit;

          • 基于樂觀鎖支持并發(fā) Commit;

          • Row-level delete,支持 Upsert。


        • 增量快照機制


          • Commit 后數(shù)據(jù)即可見(分鐘級);

          • 可回溯歷史快照。


        • 開放的表格式


          • 數(shù)據(jù)格式:parquet、orc、avro

          • 計算引擎:Spark、Flink、Hive、Trino/Presto


        • 流批接口支持


          • 支持流、批寫入;

          • 支持流、批讀取。


        二、基于 Iceberg 的湖倉一體架構(gòu)實踐


        湖倉一體的意義就是說我不需要看見湖和倉,數(shù)據(jù)有著打通的元數(shù)據(jù)的格式,它可以自由的流動,也可以對接上層多樣化的計算生態(tài)。

        ——賈揚清(阿里云計算平臺高級研究員)


        1. Append 流入湖的鏈路



        上圖為日志類數(shù)據(jù)入湖的鏈路,日志類數(shù)據(jù)包含客戶端日志、用戶端日志以及服務(wù)端日志。這些日志數(shù)據(jù)會實時錄入到 Kafka,然后通過 Flink 任務(wù)寫到 Iceberg 里面,最終存儲到 HDFS。


        2. Flink SQL 入湖鏈路打通


        我們的 Flink SQL 入湖鏈路打通是基于 “Flink 1.11 + Iceberg 0.11” 完成的,對接 Iceberg Catalog 我們主要做了以下內(nèi)容:


        1)Meta Server 增加對 Iceberg Catalog 的支持;


        2)SQL SDK 增加 Iceberg Catalog 支持。


        然后在這基礎(chǔ)上,平臺開放 Iceberg 表的管理功能,使得用戶可以自己在平臺上建 SQL 的表。


        3. 入湖 - 支持代理用戶


        第二步是內(nèi)部的實踐,對接現(xiàn)有預(yù)算體系、權(quán)限體系。


        因為之前平臺做實時作業(yè)的時候,平臺都是默認(rèn)為 Flink 用戶去運行的,之前存儲不涉及 HDFS 存儲,因此可能沒有什么問題,也就沒有思考預(yù)算劃分方面的問題。


        但是現(xiàn)在寫 Iceberg 的話,可能就會涉及一些問題。比如數(shù)倉團隊有自己的集市,數(shù)據(jù)就應(yīng)該寫到他們的目錄下面,預(yù)算也是劃到他們的預(yù)算下,同時權(quán)限和離線團隊賬號的體系打通。



        如上所示,這塊主要是在平臺上做了代理用戶的功能,用戶可以去指定用哪個賬號去把這個數(shù)據(jù)寫到 Iceberg 里面,實現(xiàn)過程主要有以下三個。


        • 增加 Table 級別配置:'iceberg.user.proxy' = 'targetUser’


        1)啟用 Superuser
        2)團隊賬號鑒權(quán)


        • 訪問 HDFS 時啟用代理用戶:



        • 訪問 Hive Metastore 時指定代理用戶

          1)參考 Spark 的相關(guān)實現(xiàn):

          org.apache.spark.deploy.security.HiveDelegationTokenProvider

          2)動態(tài)代理 HiveMetaStoreClient,使用代理用戶訪問 Hive metastore


        4. Flink SQL 入湖示例


        DDL + DML



        5. CDC 數(shù)據(jù)入湖鏈路



        如上所示,我們有一個 AutoDTS 平臺,負(fù)責(zé)業(yè)務(wù)庫數(shù)據(jù)的實時接入。我們會把這些業(yè)務(wù)庫的數(shù)據(jù)接入到 Kafka 里面,同時它還支持在平臺上配置分發(fā)任務(wù),相當(dāng)于把進 Kafka 的數(shù)據(jù)分發(fā)到不同的存儲引擎里,在這個場景下是分發(fā)到 Iceberg 里。


        6. Flink SQL CDC 入湖鏈路打通


        下面是我們基于 “Flink1.11 + Iceberg 0.11” 支持 CDC 入湖所做的改動:


        • 改進 Iceberg Sink:

          Flink 1.11 版本為 AppendStreamTableSink,無法處理 CDC 流,修改并適配。


        • 表管理

          1)支持 Primary key(PR1978)

          2)開啟 V2 版本:'iceberg.format.version' = '2'


        7. CDC 數(shù)據(jù)入湖


        ■ 1. 支持 Bucket


        Upsert 場景下,需要確保同一條數(shù)據(jù)寫入到同一 Bucket 下,這又如何實現(xiàn)?

        目前 Flink SQL 語法不支持聲明 bucket 分區(qū),通過配置的方式聲明 Bucket:

        'partition.bucket.source'='id', // 指定 bucket 字段

        'partition.bucket.num'='10',   // 指定 bucket 數(shù)量


        ■ 2. Copy-on-write sink


        做 Copy-on-Write 的原因是原本社區(qū)的 Merge-on-Read 不支持合并小文件,所以我們臨時去做了 Copy-on-write sink 的實現(xiàn)。目前業(yè)務(wù)一直在測試使用,效果良好。



        上方為 Copy-on-Write 的實現(xiàn),其實跟原來的 Merge-on-Read 比較類似,也是有 StreamWriter 多并行度寫入和 FileCommitter 單并行度順序提交。


        在 Copy-on-Write 里面,需要根據(jù)表的數(shù)據(jù)量合理設(shè)置 Bucket 數(shù),無需額外做小文件合并。


        • StreamWriter 在 snapshotState 階段多并行度寫入

          1)增加 Buffer;

          2)寫入前需要判斷上次 checkpoint 已經(jīng) commit 成功;

          3)按 bucket 分組、合并,逐個 Bucket 寫入。


        • FileCommitter 單并行度順序提交

          1)table.newOverwrite()

          2)Flink.last.committed.checkpoint.id



        8. 示例 - CDC 數(shù)據(jù)配置入湖



        如上圖所示,在實際使用中,業(yè)務(wù)方可以在 DTS 平臺上創(chuàng)建或配置分發(fā)任務(wù)即可。


        實例類型選擇 Iceberg 表,然后選擇目標(biāo)庫,表明要把哪個表的數(shù)據(jù)同步到 Iceberg 里,然后可以選原表和目標(biāo)表的字段的映射關(guān)系是什么樣的,配置之后就可以啟動分發(fā)任務(wù)。啟動之后,會在實時計算平臺 Flink 里面提交一個實時任務(wù),接著用 Copy-on-write sink 去實時地把數(shù)據(jù)寫到 Iceberg 表里面。



        9. 入湖其他實踐


        實踐一:減少 empty commit


        • 問題描述:

          在上游 Kafka 長期沒有數(shù)據(jù)的情況下,每次 Checkpoint 依舊會生成新的 Snapshot,導(dǎo)致大量的空文件和不必要的 Snapshot。


        • 解決方案(PR - 2042):

          增加配置 Flink.max-continuousempty-commits,在連續(xù)指定次數(shù) Checkpoint 都沒有數(shù)據(jù)后才真正觸發(fā) Commit,生成 Snapshot。


        實踐二:記錄 watermark


        • 問題描述:

          目前 Iceberg 表本身無法直接反映數(shù)據(jù)寫入的進度,離線調(diào)度難以精準(zhǔn)觸發(fā)下游任務(wù)。


        • 解決方案( PR - 2109 ):

          在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲情況,同時可以用來判斷分區(qū)數(shù)據(jù)完整性,用于調(diào)度觸發(fā)下游任務(wù)。


        實踐三:刪表優(yōu)化


        • 問題描述:

          刪除 Iceberg 可能會很慢,導(dǎo)致平臺接口相應(yīng)超時。因為 Iceberg 是面向?qū)ο蟠鎯沓橄?IO 層的,沒有快速清除目錄的方法。

        • 解決方案:

          擴展 FileIO,增加 deleteDir 方法,在 HDFS 上快速刪除表數(shù)據(jù)。


        10. 小文件合并及數(shù)據(jù)清理


        定期為每個表執(zhí)行批處理任務(wù)(spark 3),分為以下三個步驟:


        1. 定期合并新增分區(qū)的小文件:


        rewriteDataFilesAction.execute(); 僅合并小文件,不會刪除舊文件。


        2. 刪除過期的 snapshot,清理元數(shù)據(jù)及數(shù)據(jù)文件:


        table.expireSnapshots().expireOld erThan(timestamp).commit();


        3. 清理 orphan 文件,默認(rèn)清理 3 天前,且無法觸及的文件:


        removeOrphanFilesAction.older Than(timestamp).execute();


        11. 計算引擎 – Flink


        Flink 是實時平臺的核心計算引擎,目前主要支持?jǐn)?shù)據(jù)入湖場景,主要有以下幾個方面的特點。


        • 數(shù)據(jù)準(zhǔn)實時入湖:

          Flink 和 Iceberg 在數(shù)據(jù)入湖方面集成度最高,F(xiàn)link 社區(qū)主動擁抱數(shù)據(jù)湖技術(shù)。


        • 平臺集成:

          AutoStream 引入 IcebergCatalog,支持通過 SQL 建表、入湖 AutoDTS 支持將 MySQL、SQLServer、TiDB 表配置入湖。


        • 流批一體:

          在流批一體的理念下,F(xiàn)link 的優(yōu)勢會逐漸體現(xiàn)出來。


        12. 計算引擎 – Hive


        Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 集成度更高,主要提供以下三個方面的功能。


        • 定期小文件合并及 meta 信息查詢:

          SELECT * FROM prod.db.table.history 還可查看 snapshots, files, manifests。


        • 離線數(shù)據(jù)寫入:

          1)Insert into

          2)Insert overwrite

          3)Merge into


        • 分析查詢:

          主要支持日常的準(zhǔn)實時分析查詢場景。


        13. 計算引擎 – Trino/Presto


        AutoBI 已經(jīng)和 Presto 集成,用于報表、分析型查詢場景。


        • Trino

          1)直接將 Iceberg 作為報表數(shù)據(jù)源

          2)需要增加元數(shù)據(jù)緩存機制:https://github.com/trinodb/trino/issues/7551


        • Presto

          社區(qū)集成中:https://github.com/prestodb/presto/pull/15836


        14. 踩過的坑


        ■ 1. 訪問 Hive Metastore 異常


        問題描述:HiveConf 的構(gòu)造方法的誤用,導(dǎo)致 Hive 客戶端中聲明的配置被覆蓋,導(dǎo)致訪問 Hive metastore 時異常。


        解決方案(PR-2075):修復(fù) HiveConf 的構(gòu)造,顯示調(diào)用 addResource 方法,確保配置不會被覆蓋:hiveConf.addResource(conf);


        ■ 2.Hive metastore 鎖未釋放


        問題描述:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.” 原因是 hiveMetastoreClient.lock 方法,在未獲得鎖的情況下,也需要顯示 unlock,否則會導(dǎo)致上面異常。


        解決方案(PR-2263):優(yōu)化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的情況下顯示調(diào)用 unlock 來釋放鎖。


        ■ 3. 元數(shù)據(jù)文件丟失


        問題描述:Iceberg 表無法訪問,報 “NotFoundException Failed to open input stream for file : xxx.metadata.json”


        解決方案(PR-2328):當(dāng)調(diào)用 Hive metastore 更新 iceberg 表的 metadata_location 超時后,增加檢查機制,確認(rèn)元數(shù)據(jù)未保存成功后再刪除元數(shù)據(jù)文件。


        三、收益與總結(jié)


        1. 總結(jié)


        通過對湖倉一體、流批融合的探索,我們分別做了總結(jié)。


        • 湖倉一體

          1)Iceberg 支持 Hive Metastore;

          2)總體使用上與 Hive 表類似:相同數(shù)據(jù)格式、相同的計算引擎。


        • 流批融合

          準(zhǔn)實時場景下實現(xiàn)流批統(tǒng)一:同源、同計算、同存儲。


        2. 業(yè)務(wù)收益


        • 數(shù)據(jù)時效性提升:

          入倉延遲從 2 小時以上降低到 10 分鐘以內(nèi);算法核心任務(wù) SLA 提前 2 小時完成。


        • 準(zhǔn)實時的分析查詢:

          結(jié)合 Spark 3 和 Trino,支持準(zhǔn)實時的多維分析查詢。


        • 特征工程提效:

          提供準(zhǔn)實時的樣本數(shù)據(jù),提高模型訓(xùn)練時效性。


        • CDC 數(shù)據(jù)準(zhǔn)實時入倉:

          可以在數(shù)倉針對業(yè)務(wù)表做準(zhǔn)實時分析查詢。


        3. 架構(gòu)收益 - 準(zhǔn)實時數(shù)倉



        上方也提到了,我們支持準(zhǔn)實時的入倉和分析,相當(dāng)于是為后續(xù)的準(zhǔn)實時數(shù)倉建設(shè)提供了基礎(chǔ)的架構(gòu)驗證。準(zhǔn)實時數(shù)倉的優(yōu)勢是一次開發(fā)、口徑統(tǒng)一、統(tǒng)一存儲,是真正的批流一體。劣勢是實時性較差,原來可能是秒級、毫秒級的延遲,現(xiàn)在是分鐘級的數(shù)據(jù)可見性。


        但是在架構(gòu)層面上,這個意義還是很大的,后續(xù)我們能看到一些希望,可以把整個原來 “T + 1” 的數(shù)倉,做成準(zhǔn)實時的數(shù)倉,提升數(shù)倉整體的數(shù)據(jù)時效性,然后更好地支持上下游的業(yè)務(wù)。

         

        四、后續(xù)規(guī)劃


        ■ 1. 跟進 Iceberg 版本


        全面開放 V2 格式,支持 CDC 數(shù)據(jù)的 MOR 入湖。


        ■ 2. 建設(shè)準(zhǔn)實時數(shù)倉


        基于 Flink 通過 Data pipeline 模式對數(shù)倉各層表全面提速。


        ■ 3. 流批一體


        隨著 upsert 功能的逐步完善,持續(xù)探索存儲層面流批一體。


        ■ 4. 多維分析


        基于 Presto/Spark3 輸出準(zhǔn)實時多維分析。


        更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~



        ▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 


           戳我,查看更多技術(shù)干貨~
        瀏覽 65
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            91城中村无套站街熟妇 | 主播大尺度福利诱惑 | 亚洲 日本 欧美 日韩精品 | ahd101最新av专区 | 日韩成人午夜一区二区 | 日日嗨av一区二区三区免费 | 男人添女人下面真爽免费 | 久久精品国产亚洲AV麻豆澳门 | 123日逼网 | 免费三级a |