1. 實踐場景:解決Spark流處理產(chǎn)生的小文件

        共 3663字,需瀏覽 8分鐘

         ·

        2024-04-11 02:51

        背景

        做流批一體,湖倉一體的大數(shù)據(jù)架構(gòu),常見的做法就是:

        數(shù)據(jù)源->spark Streaming->ODS(數(shù)據(jù)湖)->spark streaming->DWD(數(shù)據(jù)湖)->...

        那么數(shù)據(jù)源->spark Streaming->ODS,以這段為例,在數(shù)據(jù)源通過spark structured streaming寫入ODS在數(shù)據(jù)湖(Delta Lake)落盤時候必然會產(chǎn)生很多小文件。

        目的

        為了在批處理spark-sql運行更快,也避免因為小文件而導(dǎo)致報錯。

        影響

              
              WARNING: Failed to connect to /172.16.xx.xx:9866 for block, add to deadNodes and continue. java.net.SocketException: 
        Too many open files
        1. 小文件在批處理數(shù)據(jù)IO消耗巨大,程序可能卡死。

        2. 小文件塊都有對應(yīng)的元數(shù)據(jù),元數(shù)據(jù)放在NameNode,導(dǎo)致需要的內(nèi)存大大增大,增加NameNode壓力,這樣會限制了集群的擴展。

        3. 在HDFS或者對象儲存中,小文件的讀寫處理速度要遠遠小于大文件,(尋址耗時)。

        解決思路

        事前

        (1)避免寫入時候產(chǎn)生過多小文件 做好分區(qū)partitionBy(年,月,日), 避免小文件過于分散 Trigger觸發(fā)時間可以設(shè)置為1分鐘,這樣會攢一批一寫入,避免秒級別寫入而產(chǎn)生大量小文件(但是使用spark structured 想要做real-time不能這樣,只適合做準實時)

        (2)打開自適應(yīng)框架的開關(guān)

              
              spark.sql.adaptive.enabled true

        (3)通過spark的coalesce()方法和repartition()方法

              
              val rdd2 = rdd1.coalesce(8, true) //(true表示是否shuffle)
        val rdd3 = rdd1.repartition(8)

        coalesce:coalesce()方法的作用是返回指定一個新的指定分區(qū)的Rdd,如果是生成一個窄依賴的結(jié)果,那么可以不發(fā)生shuffle,分區(qū)的數(shù)量發(fā)生激烈的變化,計算節(jié)點不足,不設(shè)置true可能會出錯。

        repartition:coalesce()方法shuffle為true的情況。

        事后(小文件引起已經(jīng)產(chǎn)生)

        (1)優(yōu)化 Delta 表的寫入,避免小文件產(chǎn)生 在開源版 Spark 中,每個 executor 向 partition 中寫入數(shù)據(jù)時,都會創(chuàng)建一個表文件進行寫入,最終會導(dǎo)致一個 partition 中產(chǎn)生很多的小文件。

        Databricks 對 Delta 表的寫入過程進行了優(yōu)化,對每個 partition,使用一個專門的 executor 合并其他 executor 對該 partition 的寫入,從而避免了小文件的產(chǎn)生。

        該特性由表屬性 delta.autoOptimize.optimizeWrite 來控制:可以在創(chuàng)建表時指定

              
              CREATE TABLE student (id INT, name STRING)
        TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

        也可以修改表屬性

              
              ALTER TABLE table_name
        SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

        該特性有兩個優(yōu)點:

        (1)通過減少被寫入的表文件數(shù)量,提高寫數(shù)據(jù)的吞吐量;

        (2)避免小文件的產(chǎn)生,提升查詢性能。

        缺點:

        其缺點也是顯而易見的,由于使用了一個 executor 來合并表文件的寫入,從而降低了表文件寫入的并行度,此外,多引入的一層 executor 需要對寫入的數(shù)據(jù)進行 shuffle,帶來額外的開銷。因此,在使用該特性時,需要對場景進行評估。

        場景:

        該特性適用的場景:頻繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 語句的場景;

        該特性不適用的場景:寫入 TB 級以上數(shù)據(jù)。

        (2)自動合并小文件

        在流處理場景中,比如流式數(shù)據(jù)入湖場景下,需要持續(xù)的將到達的數(shù)據(jù)插入到 Delta 表中,每次插入都會創(chuàng)建一個新的表文件用于存儲新到達的數(shù)據(jù),假設(shè)每10s觸發(fā)一次,那么這樣的流處理作業(yè)一天產(chǎn)生的表文件數(shù)量將達到8640個,且由于流處理作業(yè)通常是 long-running 的,運行該流處理作業(yè)100天將產(chǎn)生上百萬個表文件。這樣的 Delta 表,僅元數(shù)據(jù)的維護就是一個很大的挑戰(zhàn),查詢性能更是急劇惡化。

        為了解決上述問題,Databricks 提供了小文件自動合并功能,在每次向 Delta 表中寫入數(shù)據(jù)之后,會檢查 Delta 表中的表文件數(shù)量,如果 Delta 表中的小文件(size < 128MB 的視為小文件)數(shù)量達到閾值,則會執(zhí)行一次小文件合并,將 Delta 表中的小文件合并為一個新的大文件。

        該特性由表屬性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在創(chuàng)建表時指定,也可以對已創(chuàng)建的表進行修改。自動合并的閾值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默認為50,即小文件數(shù)量達到50會執(zhí)行表文件合并;

        合并后產(chǎn)生的文件最大為128MB,如果需要調(diào)整合并后的目標文件大小,可以通過調(diào)整配置 spark.databricks.delta.autoCompact.maxFileSize 實現(xiàn)。

        (3)手動合并小文件(我常用,每天定時運行合并分區(qū)內(nèi)小文件,再去處理批任務(wù))

        自動小文件合并會在對 Delta 表進行寫入,且寫入后表中小文件達到閾值時被觸發(fā)。除了自動合并之外,Databricks 還提供了 Optimize 命令使用戶可以手動合并小文件,優(yōu)化表結(jié)構(gòu),使得表文件的結(jié)構(gòu)更加緊湊。

        在實現(xiàn)上 Optimize 使用 bin-packing 算法,該算法不但會合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我們要對 Delta 表 student 的表文件進行優(yōu)化,僅需執(zhí)行如下命令即可實現(xiàn):(Optimize 命令不但支持全表小文件的合并,還支持特定的分區(qū)的表文件的合并)。

              
              OPTIMIZE student WHERE date >= '2024-01-01'

        附加

        面試官可能會問,我運行optimize合并小文件,但是小文件太多了,直接卡死運行不了程序(某互聯(lián)網(wǎng)面試題)

        (1)首先停掉程序,這里注意deltalake因為有歷史版本這個概念,所以不存在運行一半覆蓋原來版本情況,可以基于上一個版本重新運行(考點)。

        (2)第二點,大數(shù)據(jù)思想分而治之,“分”,即把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來處理。

              
              OPTIMIZE student WHERE date > '2024-01-01' and date < '2024-01-02'

        因為前面做了partitionby(年月日),那么縮小optimize范圍,在遍歷這個月的每一天日期,分治處理。

        (3)第三點,大數(shù)據(jù)思想,自己不行找兄弟,加節(jié)點,加計算資源。


        瀏覽 50
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 日韩中文字幕视频在线观看 | 亚洲最大成人7777777 | 日韩 精品 无码 系列 视频 | 任你躁精品一区二区三区介绍 | 国产欧美日韩国产 |