Spark Shuffle過程詳解

對(duì)比 Hadoop MapReduce 和 Spark 的 Shuffle 過程
Shuffle write

spark.shuffle.file.buffer.kb ,默認(rèn)是 32KB(Spark 1.1 版本以前是 100KB)。其實(shí) bucket 是一個(gè)廣義的概念,代表 ShuffleMapTask 輸出結(jié)果經(jīng)過 partition 后要存放的地方,這里為了細(xì)化數(shù)據(jù)存放位置和數(shù)據(jù)名稱,僅僅用 bucket 表示緩沖區(qū)。
partitioner.partition(record.getKey()))決定。每個(gè) bucket 里面的數(shù)據(jù)會(huì)不斷被寫到本地磁盤上,形成一個(gè) ShuffleBlockFile,或者簡(jiǎn)稱 FileSegment。之后的 reducer 會(huì)去 fetch 屬于自己的 FileSegment,進(jìn)入 shuffle read 階段。產(chǎn)生的 FileSegment 過多。每個(gè) ShuffleMapTask 產(chǎn)生 R(reducer 個(gè)數(shù))個(gè) FileSegment,M 個(gè) ShuffleMapTask 就會(huì)產(chǎn)生 M * R 個(gè)文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會(huì)存在大量的數(shù)據(jù)文件。
緩沖區(qū)占用內(nèi)存空間大。每個(gè) ShuffleMapTask 需要開 R 個(gè) bucket,M 個(gè) ShuffleMapTask 就會(huì)產(chǎn)生 M R 個(gè) bucket。雖然一個(gè) ShuffleMapTask 結(jié)束后,對(duì)應(yīng)的緩沖區(qū)可以被回收,但一個(gè) worker node 上同時(shí)存在的 bucket 個(gè)數(shù)可以達(dá)到 cores R 個(gè)(一般 worker 同時(shí)可以運(yùn)行 cores 個(gè) ShuffleMapTask),占用的內(nèi)存空間也就達(dá)到了
cores * R * 32 KB。對(duì)于 8 核 1000 個(gè) reducer 來(lái)說(shuō),占用內(nèi)存就是 256MB。

spark.shuffle.consolidateFiles=true來(lái)開啟。Shuffle read

在什么時(shí)候 fetch,parent stage 中的一個(gè) ShuffleMapTask 執(zhí)行完還是等全部 ShuffleMapTasks 執(zhí)行完?
邊 fetch 邊處理還是一次性 fetch 完再處理?
fetch 來(lái)的數(shù)據(jù)存放到哪里?
怎么獲得要 fetch 的數(shù)據(jù)的存放位置?
在什么時(shí)候 fetch?當(dāng) parent stage 的所有 ShuffleMapTasks 結(jié)束后再 fetch。理論上講,一個(gè) ShuffleMapTask 結(jié)束后就可以 fetch,但是為了迎合 stage 的概念(即一個(gè) stage 如果其 parent stages 沒有執(zhí)行完,自己是不能被提交執(zhí)行的),還是選擇全部 ShuffleMapTasks 執(zhí)行完再去 fetch。因?yàn)?fetch 來(lái)的 FileSegments 要先在內(nèi)存做緩沖,所以一次 fetch 的 FileSegments 總大小不能太大。Spark 規(guī)定這個(gè)緩沖界限不能超過
spark.reducer.maxMbInFlight,這里用 softBuffer 表示,默認(rèn)大小為 48MB。一個(gè) softBuffer 里面一般包含多個(gè) FileSegment,但如果某個(gè) FileSegment 特別大的話,這一個(gè)就可以填滿甚至超過 softBuffer 的界限。邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。本質(zhì)上,MapReduce shuffle 階段就是邊 fetch 邊使用 combine() 進(jìn)行處理,只是 combine() 處理的是部分?jǐn)?shù)據(jù)。MapReduce 為了讓進(jìn)入 reduce() 的 records 有序,必須等到全部數(shù)據(jù)都 shuffle-sort 后再開始 reduce()。因?yàn)?Spark 不要求 shuffle 后的數(shù)據(jù)全局有序,因此沒必要等到全部數(shù)據(jù) shuffle 完成后再處理。那么如何實(shí)現(xiàn)邊 shuffle 邊處理,而且流入的 records 是無(wú)序的?答案是使用可以 aggregate 的數(shù)據(jù)結(jié)構(gòu),比如 HashMap。每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來(lái))一個(gè) \
record,直接將其放進(jìn) HashMap 里面。如果該 HashMap 已經(jīng)存在相應(yīng)的 Key,那么直接進(jìn)行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是hashMap.get(Key) + Value,并將 func 的結(jié)果重新 put(key) 到 HashMap 中去。這個(gè) func 功能上相當(dāng)于 reduce(),但實(shí)際處理數(shù)據(jù)的方式與 MapReduce reduce() 有差別,差別相當(dāng)于下面兩段程序的差別。// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result
}
// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result
}MapReduce 可以在 process 函數(shù)里面可以定義任何數(shù)據(jù)結(jié)構(gòu),也可以將部分或全部的 values 都 cache 后再進(jìn)行處理,非常靈活。而 Spark 中的 func 的輸入?yún)?shù)是固定的,一個(gè)是上一個(gè) record 的處理結(jié)果,另一個(gè)是當(dāng)前讀入的 record,它們經(jīng)過 func 處理后的結(jié)果被下一個(gè) record 處理時(shí)使用。因此一些算法比如求平均數(shù),在 process 里面很好實(shí)現(xiàn),直接
sum(values)/values.length,而在 Spark 中 func 可以實(shí)現(xiàn)sum(values),但不好實(shí)現(xiàn)/values.length。更多的 func 將會(huì)在下面的章節(jié)細(xì)致分析。fetch 來(lái)的數(shù)據(jù)存放到哪里?剛 fetch 來(lái)的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。這里我們主要討論處理后的數(shù)據(jù),可以靈活設(shè)置這些數(shù)據(jù)是“只用內(nèi)存”還是“內(nèi)存+磁盤”。如果
spark.shuffle.spill = false就只用內(nèi)存。內(nèi)存使用的是AppendOnlyMap,類似 Java 的HashMap,內(nèi)存+磁盤使用的是ExternalAppendOnlyMap,如果內(nèi)存空間不足時(shí),ExternalAppendOnlyMap可以將 \records 進(jìn)行 sort 后 spill 到磁盤上,等到需要它們的時(shí)候再進(jìn)行歸并,后面會(huì)詳解。使用“內(nèi)存+磁盤”的一個(gè)主要問題就是如何在兩者之間取得平衡?在 Hadoop MapReduce 中,默認(rèn)將 reducer 的 70% 的內(nèi)存空間用于存放 shuffle 來(lái)的數(shù)據(jù),等到這個(gè)空間利用率達(dá)到 66% 的時(shí)候就開始 merge-combine()-spill。在 Spark 中,也適用同樣的策略,一旦 ExternalAppendOnlyMap 達(dá)到一個(gè)閾值就開始 spill,具體細(xì)節(jié)下面會(huì)討論。 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?在上一章討論物理執(zhí)行圖中的 stage 劃分的時(shí)候,我們強(qiáng)調(diào) “一個(gè) ShuffleMapStage 形成后,會(huì)將該 stage 最后一個(gè) final RDD 注冊(cè)到
MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),這一步很重要,因?yàn)?shuffle 過程需要 MapOutputTrackerMaster 來(lái)指示 ShuffleMapTask 輸出數(shù)據(jù)的位置”。因此,reducer 在 shuffle 的時(shí)候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數(shù)據(jù)位置的。每個(gè) ShuffleMapTask 完成時(shí)會(huì)將 FileSegment 的存儲(chǔ)位置信息匯報(bào)給 MapOutputTrackerMaster。
典型 transformation() 的 shuffle read
1. reduceByKey(func)

map 端的區(qū)別:map() 沒有區(qū)別。對(duì)于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上進(jìn)行 combine()。
reduce 端區(qū)別:MapReduce 的 shuffle 階段先 fetch 數(shù)據(jù),數(shù)據(jù)量到達(dá)一定規(guī)模后 combine(),再將剩余數(shù)據(jù) merge-sort 后 reduce(),reduce() 非常靈活。Spark 邊 fetch 邊 reduce()(在 HashMap 上執(zhí)行 func),因此要求 func 符合 commulative 的特性。
map 端區(qū)別:MapReduce 需要開一個(gè)大型環(huán)形緩沖區(qū)來(lái)暫存和排序 map() 的部分輸出結(jié)果,但 combine() 不需要額外空間(除非用戶自己定義)。Spark 需要 HashMap 內(nèi)存數(shù)據(jù)結(jié)構(gòu)來(lái)進(jìn)行 combine(),同時(shí)輸出 records 到磁盤上時(shí)也需要一個(gè)小的 buffer(bucket)。
reduce 端區(qū)別:MapReduce 需要一部分內(nèi)存空間來(lái)存儲(chǔ) shuffle 過來(lái)的數(shù)據(jù),combine() 和 reduce() 不需要額外空間,因?yàn)樗鼈兊妮斎霐?shù)據(jù)分段有序,只需歸并一下就可以得到。在 Spark 中,fetch 時(shí)需要 softBuffer,處理數(shù)據(jù)時(shí)如果只使用內(nèi)存,那么需要 HashMap 來(lái)持有處理后的結(jié)果。如果使用內(nèi)存+磁盤,那么在 HashMap 存放一部分處理后的數(shù)據(jù)。
2. groupByKey(numPartitions)

result = result ++ record.value,功能是將每個(gè) key 對(duì)應(yīng)的所有 values 鏈接在一起。result 來(lái)自 hashMap.get(record.key),計(jì)算后的 result 會(huì)再次被 put 到 hashMap 中。與 reduceByKey() 的區(qū)別就是 groupByKey() 沒有 map 端的 combine()。對(duì)于 groupByKey() 來(lái)說(shuō) map 端的 combine() 只是減少了重復(fù) Key 占用的空間,如果 key 重復(fù)率不高,沒必要 combine(),否則,最好能夠 combine()。3. distinct(numPartitions)

result = result == null? record.value : result,如果 HashMap 中沒有該 record 就將其放入,否則舍棄。與 reduceByKey() 相同,在map 端存在 combine()。4. cogroup(otherRDD, numPartitions)

5. intersection(otherRDD) 和 join(otherRDD, numPartitions)


6. sortByKey(ascending, numPartitions)

7. coalesce(numPartitions, shuffle = true)

Shuffle read 中的 HashMap
1. AppendOnlyMap
remove(key)方法。其實(shí)現(xiàn)原理很簡(jiǎn)單,開一個(gè)大 Object 數(shù)組,藍(lán)色部分存儲(chǔ) Key,白色部分存儲(chǔ) Value。如下圖:
destructiveSortedIterator(): Iterator[(K, V)] 方法,可以返回 Array 中排序后的 (K, V) pairs。實(shí)現(xiàn)方法很簡(jiǎn)單:先將所有 (K, V) pairs compact 到 Array 的前端,并使得每個(gè) (K, V) 占一個(gè)位置(原來(lái)占兩個(gè)),之后直接調(diào)用 Array.sort() 排序,不過這樣做會(huì)破壞數(shù)組(key 的位置變化了)。2. ExternalAppendOnlyMap

內(nèi)存剩余空間檢測(cè)
與 Hadoop MapReduce 規(guī)定 reducer 中 70% 的空間可用于 shuffle-sort 類似,Spark 也規(guī)定 executor 中
spark.shuffle.memoryFraction * spark.shuffle.safetyFraction的空間(默認(rèn)是0.3 * 0.8)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是這 24% 的空間不是完全用于一個(gè) ExternalOnlyAppendMap 的,而是由在 executor 上同時(shí)運(yùn)行的所有 reducer 共享的。為此,exectuor 專門持有一個(gè)ShuffleMemroyMap: HashMap[threadId, occupiedMemory]來(lái)監(jiān)控每個(gè) reducer 中 ExternalOnlyAppendMap 占用的內(nèi)存量。每當(dāng) AppendOnlyMap 要擴(kuò)展時(shí),都會(huì)計(jì)算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的內(nèi)存 + 擴(kuò)展后的內(nèi)存 是會(huì)否會(huì)大于內(nèi)存限制,大于就會(huì)將 AppendOnlyMap spill 到磁盤。有一點(diǎn)需要注意的是前 1000 個(gè) records 進(jìn)入 AppendOnlyMap 的時(shí)候不會(huì)啟動(dòng)是否要 spill 的檢查,需要擴(kuò)展時(shí)就直接在內(nèi)存中擴(kuò)展。AppendOnlyMap 大小估計(jì)
為了獲知 AppendOnlyMap 占用的內(nèi)存空間,可以在每次擴(kuò)展時(shí)都將 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但這樣做非常耗時(shí)。所以 Spark 設(shè)計(jì)了粗略的估算算法,算法時(shí)間復(fù)雜度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小變化及一共 insert 的 records 的個(gè)數(shù)來(lái)估算大小,具體見
SizeTrackingAppendOnlyMap和SizeEstimator。Spill 過程
與 shuffle write 一樣,在 spill records 到磁盤上的時(shí)候,會(huì)建立一個(gè) buffer 緩沖區(qū),大小仍為
spark.shuffle.file.buffer.kb,默認(rèn)是 32KB。另外,由于 serializer 也會(huì)分配緩沖區(qū)用于序列化和反序列化,所以如果一次 serialize 的 records 過多的話緩沖區(qū)會(huì)變得很大。Spark 限制每次 serialize 的 records 個(gè)數(shù)為spark.shuffle.spill.batchSize,默認(rèn)是 10000。
Discussion

