Spark數(shù)據(jù)傾斜解決
一、數(shù)據(jù)傾斜表現(xiàn)
數(shù)據(jù)傾斜就是數(shù)據(jù)分到各個區(qū)的數(shù)量不太均勻,可以自定義分區(qū)器,想怎么分就怎么分。
Spark中的數(shù)據(jù)傾斜問題主要指shuffle過程中出現(xiàn)的數(shù)據(jù)傾斜問題,是由于不同的key對應(yīng)的數(shù)據(jù)量不同導(dǎo)致的不同task所處理的數(shù)據(jù)量不同的問題。
例如,reduced端一共要處理100萬條數(shù)據(jù),第一個和第二個task分別被分配到了1萬條數(shù)據(jù),計算5分鐘內(nèi)完成,第三個task分配到了98萬數(shù)據(jù),此時第三個task可能需要10個小時完成,這使得整個Spark作業(yè)需要10個小時才能運行完成,這就是數(shù)據(jù)傾斜所帶來的后果。
注意,要區(qū)分開數(shù)據(jù)傾斜與數(shù)據(jù)過量這兩種情況,數(shù)據(jù)傾斜是指少數(shù)task被分配了絕大多數(shù)的數(shù)據(jù),因此少數(shù)task運行緩慢;數(shù)據(jù)過量是指所有task被分配的數(shù)據(jù)量都很大,相差不多,所有task都運行緩慢。
數(shù)據(jù)傾斜的表現(xiàn):
Spark作業(yè)的大部分task都執(zhí)行迅速,只有有限的幾個task執(zhí)行的非常慢,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)可以運行,但是運行得非常慢; Spark作業(yè)的大部分task都執(zhí)行迅速,但是有的task在運行過程中會突然報出OOM,反復(fù)執(zhí)行幾次都在某一個task報出OOM錯誤,此時可能出現(xiàn)了數(shù)據(jù)傾斜,作業(yè)無法正常運行。定位數(shù)據(jù)傾斜問題: 查閱代碼中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根據(jù)代碼邏輯判斷此處是否會出現(xiàn)數(shù)據(jù)傾斜; 查看Spark作業(yè)的log文件,log文件對于錯誤的記錄會精確到代碼的某一行,可以根據(jù)異常定位到的代碼位置來明確錯誤發(fā)生在第幾個stage,對應(yīng)的shuffle算子是哪一個;
二、數(shù)據(jù)傾斜解決
1. 預(yù)聚合原始數(shù)據(jù)
1. 避免shuffle過程
絕大多數(shù)情況下,Spark作業(yè)的數(shù)據(jù)來源都是Hive表,這些Hive表基本都是經(jīng)過ETL之后的昨天的數(shù)據(jù)。為了避免數(shù)據(jù)傾斜,我們可以考慮避免shuffle過程,如果避免了shuffle過程,那么從根本上就消除了發(fā)生數(shù)據(jù)傾斜問題的可能。
如果Spark作業(yè)的數(shù)據(jù)來源于Hive表,那么可以先在Hive表中對數(shù)據(jù)進行聚合,例如按照key進行分組,將同一key對應(yīng)的所有value用一種特殊的格式拼接到一個字符串里去,這樣,一個key就只有一條數(shù)據(jù)了;之后,對一個key的所有value進行處理時,只需要進行map操作即可,無需再進行任何的shuffle操作。通過上述方式就避免了執(zhí)行shuffle操作,也就不可能會發(fā)生任何的數(shù)據(jù)傾斜問題。
對于Hive表中數(shù)據(jù)的操作,不一定是拼接成一個字符串,也可以是直接對key的每一條數(shù)據(jù)進行累計計算。要區(qū)分開,處理的數(shù)據(jù)量大和數(shù)據(jù)傾斜的區(qū)別。
2. 增大key粒度(減小數(shù)據(jù)傾斜可能性,增大每個task的數(shù)據(jù)量)
如果沒有辦法對每個key聚合出來一條數(shù)據(jù),在特定場景下,可以考慮擴大key的聚合粒度。
例如,目前有10萬條用戶數(shù)據(jù),當(dāng)前key的粒度是(省,城市,區(qū),日期),現(xiàn)在我們考慮擴大粒度,將key的粒度擴大為(省,城市,日期),這樣的話,key的數(shù)量會減少,key之間的數(shù)據(jù)量差異也有可能會減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問題。(此方法只針對特定類型的數(shù)據(jù)有效,當(dāng)應(yīng)用場景不適宜時,會加重數(shù)據(jù)傾斜)
2. 預(yù)處理導(dǎo)致傾斜的key
1. 過濾
如果在Spark作業(yè)中允許丟棄某些數(shù)據(jù),那么可以考慮將可能導(dǎo)致數(shù)據(jù)傾斜的key進行過濾,濾除可能導(dǎo)致數(shù)據(jù)傾斜的key對應(yīng)的數(shù)據(jù),這樣,在Spark作業(yè)中就不會發(fā)生數(shù)據(jù)傾斜了。
2. 使用隨機key
當(dāng)使用了類似于groupByKey、reduceByKey這樣的算子時,可以考慮使用隨機key實現(xiàn)雙重聚合,如下圖所示:

首先,通過map算子給每個數(shù)據(jù)的key添加隨機數(shù)前綴,對key進行打散,將原先一樣的key變成不一樣的key,然后進行第一次聚合,這樣就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合;隨后,去除掉每個key的前綴,再次進行聚合。
此方法對于由groupByKey、reduceByKey這類算子造成的數(shù)據(jù)傾斜有比較好的效果,僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
此方法也是前幾種方案沒有比較好的效果時要嘗試的解決方案。
3. sample采樣對傾斜key單獨進行join
在Spark中,如果某個RDD只有一個key,那么在shuffle過程中會默認將此key對應(yīng)的數(shù)據(jù)打散,由不同的reduce端task進行處理。
所以當(dāng)由單個key導(dǎo)致數(shù)據(jù)傾斜時,可有將發(fā)生數(shù)據(jù)傾斜的key單獨提取出來,組成一個RDD,然后用這個原本會導(dǎo)致傾斜的key組成的RDD和其他RDD單獨join,此時,根據(jù)Spark的運行機制,此RDD中的數(shù)據(jù)會在shuffle階段被分散到多個task中去進行join操作。
傾斜key單獨join的流程如下圖所示:

適用場景分析:
對于RDD中的數(shù)據(jù),可以將其轉(zhuǎn)換為一個中間表,或者是直接使用countByKey()的方式,看一下這個RDD中各個key對應(yīng)的數(shù)據(jù)量,此時如果你發(fā)現(xiàn)整個RDD就一個key的數(shù)據(jù)量特別多,那么就可以考慮使用這種方法。
當(dāng)數(shù)據(jù)量非常大時,可以考慮使用sample采樣獲取10%的數(shù)據(jù),然后分析這10%的數(shù)據(jù)中哪個key可能會導(dǎo)致數(shù)據(jù)傾斜,然后將這個key對應(yīng)的數(shù)據(jù)單獨提取出來。
不適用場景分析:
如果一個RDD中導(dǎo)致數(shù)據(jù)傾斜的key很多,那么此方案不適用。
3. 提高reduce并行度
當(dāng)方案一和方案二對于數(shù)據(jù)傾斜的處理沒有很好的效果時,可以考慮提高shuffle過程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的數(shù)量,那么每個task分配到的數(shù)據(jù)量就會相應(yīng)減少,由此緩解數(shù)據(jù)傾斜問題。
1. reduce端并行度的設(shè)置
在大部分的shuffle算子中,都可以傳入一個并行度的設(shè)置參數(shù),比如reduceByKey(500),這個參數(shù)會決定shuffle過程中reduce端的并行度,在進行shuffle操作的時候,就會對應(yīng)著創(chuàng)建指定數(shù)量的reduce task。對于Spark SQL中的shuffle類語句,比如group by、join等,需要設(shè)置一個參數(shù),即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度,該值默認是200,對于很多場景來說都有點過小。
增加shuffle read task的數(shù)量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)。
舉例來說,如果原本有5個key,每個key對應(yīng)10條數(shù)據(jù),這5個key都是分配給一個task的,那么這個task就要處理50條數(shù)據(jù)。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數(shù)據(jù),那么自然每個task的執(zhí)行時間都會變短了。
2. reduce端并行度設(shè)置存在的缺陷
提高reduce端并行度并沒有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題(方案一和方案二從根本上避免了數(shù)據(jù)傾斜的發(fā)生),只是盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力,以及數(shù)據(jù)傾斜的問題,適用于有較多key對應(yīng)的數(shù)據(jù)量都比較大的情況。
該方案通常無法徹底解決數(shù)據(jù)傾斜,因為如果出現(xiàn)一些極端情況,比如某個key對應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少,這個對應(yīng)著100萬數(shù)據(jù)的key肯定還是會分配到一個task中去處理,因此注定還是會發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時嘗試使用的一種手段,嘗試去用最簡單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。
在理想情況下,reduce端并行度提升后,會在一定程度上減輕數(shù)據(jù)傾斜的問題,甚至基本消除數(shù)據(jù)傾斜;但是,在一些情況下,只會讓原來由于數(shù)據(jù)傾斜而運行緩慢的task運行速度稍有提升,或者避免了某些task的OOM問題,但是,仍然運行緩慢,此時,要及時放棄方案三,開始嘗試后面的方案。
4. 使用map join
正常情況下,join操作都會執(zhí)行shuffle過程,并且執(zhí)行的是reduce join,也就是先將所有相同的key和對應(yīng)的value匯聚到一個reduce task中,然后再進行join。普通join的過程如下圖所示:

普通的join是會走shuffle過程的,而一旦shuffle,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果,也就是map join,此時就不會發(fā)生shuffle操作,也就不會發(fā)生數(shù)據(jù)傾斜。
注意:RDD是并不能直接進行廣播的,只能將RDD內(nèi)部的數(shù)據(jù)通過collect拉取到Driver內(nèi)存然后再進行廣播。
1. 核心思路:
不使用join算子進行連接操作,而使用broadcast變量與map類算子實現(xiàn)join操作,進而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來,然后對其創(chuàng)建一個broadcast變量;接著對另外一個RDD執(zhí)行map類算子,在算子函數(shù)內(nèi),從broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來。
根據(jù)上述思路,根本不會發(fā)生shuffle操作,從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜問題。
當(dāng)join操作有數(shù)據(jù)傾斜問題并且其中一個RDD的數(shù)據(jù)量較小時,可以優(yōu)先考慮這種方式,效果非常好。
map join的過程如下圖所示:

2. 不適用場景分析:
由于Spark的廣播變量是在每個Executor中保存一個副本,如果兩個RDD數(shù)據(jù)量都比較大,那么如果將一個數(shù)據(jù)量比較大的RDD做成廣播變量,那么很有可能會造成內(nèi)存溢出。
