1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Spark數(shù)據(jù)傾斜解決

        共 4234字,需瀏覽 9分鐘

         ·

        2022-07-25 00:39

        一、數(shù)據(jù)傾斜表現(xiàn)

        數(shù)據(jù)傾斜就是數(shù)據(jù)分到各個區(qū)的數(shù)量不太均勻,可以自定義分區(qū)器,想怎么分就怎么分。

        Spark中的數(shù)據(jù)傾斜問題主要指shuffle過程中出現(xiàn)的數(shù)據(jù)傾斜問題,是由于不同的key對應(yīng)的數(shù)據(jù)量不同導(dǎo)致的不同task所處理的數(shù)據(jù)量不同的問題。

        例如,reduced端一共要處理100萬條數(shù)據(jù),第一個和第二個task分別被分配到了1萬條數(shù)據(jù),計算5分鐘內(nèi)完成,第三個task分配到了98萬數(shù)據(jù),此時第三個task可能需要10個小時完成,這使得整個Spark作業(yè)需要10個小時才能運行完成,這就是數(shù)據(jù)傾斜所帶來的后果。

        注意,要區(qū)分開數(shù)據(jù)傾斜數(shù)據(jù)過量這兩種情況,數(shù)據(jù)傾斜是指少數(shù)task被分配了絕大多數(shù)的數(shù)據(jù),因此少數(shù)task運行緩慢;數(shù)據(jù)過量是指所有task被分配的數(shù)據(jù)量都很大,相差不多,所有task都運行緩慢。

        數(shù)據(jù)傾斜的表現(xiàn):

        1. Spark作業(yè)的大部分task都執(zhí)行迅速,只有有限的幾個task執(zhí)行的非常慢,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)可以運行,但是運行得非常慢;
        2. Spark作業(yè)的大部分task都執(zhí)行迅速,但是有的task在運行過程中會突然報出OOM,反復(fù)執(zhí)行幾次都在某一個task報出OOM錯誤,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)無法正常運行。定位數(shù)據(jù)傾斜問題:
        3. 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據(jù)代碼邏輯判斷此處是否會出現(xiàn)數(shù)據(jù)傾斜;
        4. 查看Spark作業(yè)的log文件,log文件對于錯誤的記錄會精確到代碼的某一行,可以根據(jù)異常定位到的代碼位置來明確錯誤發(fā)生在第幾個stage,對應(yīng)的shuffle算子是哪一個;

        二、數(shù)據(jù)傾斜解決

        1. 預(yù)聚合原始數(shù)據(jù)

        1. 避免shuffle過程

        絕大多數(shù)情況下,Spark作業(yè)的數(shù)據(jù)來源都是Hive表,這些Hive表基本都是經(jīng)過ETL之后的昨天的數(shù)據(jù)。為了避免數(shù)據(jù)傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那么從根本上就消除了發(fā)生數(shù)據(jù)傾斜問題的可能。

        如果Spark作業(yè)的數(shù)據(jù)來源于Hive表,那么可以先在Hive表中對數(shù)據(jù)進行聚合,例如按照key進行分組,將同一key對應(yīng)的所有value用一種特殊的格式拼接到一個字符串里去,這樣,一個key就只有一條數(shù)據(jù)了;之后,對一個key的所有value進行處理時,只需要進行map操作即可,無需再進行任何的shuffle操作。通過上述方式就避免了執(zhí)行shuffle操作,也就不可能會發(fā)生任何的數(shù)據(jù)傾斜問題。

        對于Hive表中數(shù)據(jù)的操作,不一定是拼接成一個字符串,也可以是直接對key的每一條數(shù)據(jù)進行累計計算。要區(qū)分開,處理的數(shù)據(jù)量大和數(shù)據(jù)傾斜的區(qū)別。

        2. 增大key粒度(減小數(shù)據(jù)傾斜可能性,增大每個task的數(shù)據(jù)量)

        如果沒有辦法對每個key聚合出來一條數(shù)據(jù),在特定場景下,可以考慮擴大key的聚合粒度。

        例如,目前有10萬條用戶數(shù)據(jù),當(dāng)前key的粒度是(省,城市,區(qū),日期),現(xiàn)在我們考慮擴大粒度,將key的粒度擴大為(省,城市,日期),這樣的話,key的數(shù)量會減少,key之間的數(shù)據(jù)量差異也有可能會減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問題。(此方法只針對特定類型的數(shù)據(jù)有效,當(dāng)應(yīng)用場景不適宜時,會加重數(shù)據(jù)傾斜)

        2. 預(yù)處理導(dǎo)致傾斜的key

        1. 過濾

        如果在Spark作業(yè)中允許丟棄某些數(shù)據(jù),那么可以考慮將可能導(dǎo)致數(shù)據(jù)傾斜的key進行過濾,濾除可能導(dǎo)致數(shù)據(jù)傾斜的key對應(yīng)的數(shù)據(jù),這樣,在Spark作業(yè)中就不會發(fā)生數(shù)據(jù)傾斜了。

        2. 使用隨機key

        當(dāng)使用了類似于groupByKey、reduceByKey這樣的算子時,可以考慮使用隨機key實現(xiàn)雙重聚合,如下圖所示:

        隨機key實現(xiàn)雙重聚合

        首先,通過map算子給每個數(shù)據(jù)的key添加隨機數(shù)前綴,對key進行打散,將原先一樣的key變成不一樣的key,然后進行第一次聚合,這樣就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合;隨后,去除掉每個key的前綴,再次進行聚合。

        此方法對于由groupByKey、reduceByKey這類算子造成的數(shù)據(jù)傾斜有比較好的效果,僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案

        此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。

        3. sample采樣對傾斜key單獨進行join

        在Spark中,如果某個RDD只有一個key,那么在shuffle過程中會默認將此key對應(yīng)的數(shù)據(jù)打散,由不同的reduce端task進行處理。

        所以當(dāng)由單個key導(dǎo)致數(shù)據(jù)傾斜時,可有將發(fā)生數(shù)據(jù)傾斜的key單獨提取出來,組成一個RDD,然后用這個原本會導(dǎo)致傾斜的key組成的RDD和其他RDD單獨join,此時,根據(jù)Spark的運行機制,此RDD中的數(shù)據(jù)會在shuffle階段被分散到多個task中去進行join操作。

        傾斜key單獨join的流程如下圖所示:

        傾斜key單獨join流程

        適用場景分析:

        對于RDD中的數(shù)據(jù),可以將其轉(zhuǎn)換為一個中間表,或者是直接使用countByKey()的方式,看一下這個RDD中各個key對應(yīng)的數(shù)據(jù)量,此時如果你發(fā)現(xiàn)整個RDD就一個key的數(shù)據(jù)量特別多,那么就可以考慮使用這種方法。

        當(dāng)數(shù)據(jù)量非常大時,可以考慮使用sample采樣獲取10%的數(shù)據(jù),然后分析這10%的數(shù)據(jù)中哪個key可能會導(dǎo)致數(shù)據(jù)傾斜,然后將這個key對應(yīng)的數(shù)據(jù)單獨提取出來。

        不適用場景分析:

        如果一個RDD中導(dǎo)致數(shù)據(jù)傾斜的key很多,那么此方案不適用。

        3. 提高reduce并行度

        當(dāng)方案一和方案二對于數(shù)據(jù)傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的數(shù)量,那么每個task分配到的數(shù)據(jù)量就會相應(yīng)減少,由此緩解數(shù)據(jù)傾斜問題。

        1. reduce端并行度的設(shè)置

        在大部分的shuffle算子中,都可以傳入一個并行度的設(shè)置參數(shù),比如reduceByKey(500),這個參數(shù)會決定shuffle過程中reduce端的并行度,在進行shuffle操作的時候,就會對應(yīng)著創(chuàng)建指定數(shù)量的reduce task。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設(shè)置一個參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認是200,對于很多場景來說都有點過小。

        增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。

        舉例來說,如果原本有5個key,每個key對應(yīng)10條數(shù)據(jù),這5個key都是分配給一個task的,那么這個task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數(shù)據(jù),那么自然每個task的執(zhí)行時間都會變短了。

        2. reduce端并行度設(shè)置存在的缺陷

        提高reduce端并行度并沒有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題(方案一和方案二從根本上避免了數(shù)據(jù)傾斜的發(fā)生),只是盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力,以及數(shù)據(jù)傾斜的問題,適用于有較多key對應(yīng)的數(shù)據(jù)量都比較大的情況。

        該方案通常無法徹底解決數(shù)據(jù)傾斜,因為如果出現(xiàn)一些極端情況,比如某個key對應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個對應(yīng)著100萬數(shù)據(jù)的key肯定還是會分配到一個task中去處理,因此注定還是會發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時嘗試使用的一種手段,嘗試去用最簡單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。

        在理想情況下,reduce端并行度提升后,會在一定程度上減輕數(shù)據(jù)傾斜的問題,甚至基本消除數(shù)據(jù)傾斜;但是,在一些情況下,只會讓原來由于數(shù)據(jù)傾斜而運行緩慢的task運行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運行緩慢,此時,要及時放棄方案三,開始嘗試后面的方案。

        4. 使用map join

        正常情況下,join操作都會執(zhí)行shuffle過程,并且執(zhí)行的是reduce join,也就是先將所有相同的key和對應(yīng)的value匯聚到一個reduce task中,然后再進行join。普通join的過程如下圖所示:

        普通join過程

        普通的join是會走shuffle過程的,而一旦shuffle,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。

        注意:RDD是并不能直接進行廣播的,只能將RDD內(nèi)部的數(shù)據(jù)通過collect拉取到Driver內(nèi)存然后再進行廣播

        1. 核心思路:

        不使用join算子進行連接操作,而使用broadcast變量與map類算子實現(xiàn)join操作,進而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來,然后對其創(chuàng)建一個broadcast變量;接著對另外一個RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來。

        根據(jù)上述思路,根本不會發(fā)生shuffle操作,從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜問題。

        當(dāng)join操作有數(shù)據(jù)傾斜問題并且其中一個RDD的數(shù)據(jù)量較小時,可以優(yōu)先考慮這種方式,效果非常好。

        map join的過程如下圖所示:

        map join過程

        2. 不適用場景分析:

        由于Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數(shù)據(jù)量都比較大,那么如果將一個數(shù)據(jù)量比較大的RDD做成廣播變量,那么很有可能會造成內(nèi)存溢出。

        瀏覽 18
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            久草福利视频 | 口述交换做爰全过程 | 成人片试看 | brazzersvideosexhd欧美高清 | 亚洲爱 | 色秘 乱码一区二区三区88 | 五月天国产激情视频在线观看 | 天天综合网站入口即化 | 色综合色综合网色综合 | 新婚娇妻和别人做爰h |