国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

Spark Shuffle過程詳解

共 13963字,需瀏覽 28分鐘

 ·

2021-04-15 08:15

點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)
回復(fù)”資源“獲取更多資源

?

?你需要預(yù)習(xí):
《Spark的Cache和Checkpoint區(qū)別和聯(lián)系拾遺

《Spark Job 邏輯執(zhí)行圖和數(shù)據(jù)依賴解析

《Spark Job 物理執(zhí)行圖詳解

上一章里討論了 job 的物理執(zhí)行圖,也討論了流入 RDD 中的 records 是怎么被 compute() 后流到后續(xù) RDD 的,同時(shí)也分析了 task 是怎么產(chǎn)生 result,以及 result 怎么被收集后計(jì)算出最終結(jié)果的。然而,我們還沒有討論數(shù)據(jù)是怎么通過 ShuffleDependency 流向下一個(gè) stage 的?

對(duì)比 Hadoop MapReduce 和 Spark 的 Shuffle 過程

如果熟悉 Hadoop MapReduce 中的 shuffle 過程,可能會(huì)按照 MapReduce 的思路去想象 Spark 的 shuffle 過程。然而,它們之間有一些區(qū)別和聯(lián)系。
從 high-level 的角度來(lái)看,兩者并沒有大的差別。 都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進(jìn)行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個(gè) stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以內(nèi)存作緩沖區(qū),邊 shuffle 邊 aggregate 數(shù)據(jù),等到數(shù)據(jù) aggregate 好以后進(jìn)行 reduce() (Spark 里可能是后續(xù)的一系列操作)。
從 low-level 的角度來(lái)看,兩者差別不小。 Hadoop MapReduce 是 sort-based,進(jìn)入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在于 combine/reduce() 可以處理大規(guī)模的數(shù)據(jù),因?yàn)槠漭斎霐?shù)據(jù)可以通過外排得到(mapper 對(duì)每段數(shù)據(jù)先做排序,reducer 的 shuffle 對(duì)排好序的每段數(shù)據(jù)做歸并)。目前的 Spark 默認(rèn)選擇的是 hash-based,通常使用 HashMap 來(lái)對(duì) shuffle 來(lái)的數(shù)據(jù)進(jìn)行 aggregate,不會(huì)對(duì)數(shù)據(jù)進(jìn)行提前排序。如果用戶需要經(jīng)過排序的數(shù)據(jù),那么需要自己調(diào)用類似 sortByKey() 的操作;如果你是Spark 1.1的用戶,可以將spark.shuffle.manager設(shè)置為sort,則會(huì)對(duì)數(shù)據(jù)進(jìn)行排序。在Spark 1.2中,sort將作為默認(rèn)的Shuffle實(shí)現(xiàn)。
從實(shí)現(xiàn)角度來(lái)看,兩者也有不少差別。 Hadoop MapReduce 將處理流程劃分出明顯的幾個(gè)階段:map(), spill, merge, shuffle, sort, reduce() 等。每個(gè)階段各司其職,可以按照過程式的編程思想來(lái)逐一實(shí)現(xiàn)每個(gè)階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蘊(yùn)含在 transformation() 中。
如果我們將 map 端劃分?jǐn)?shù)據(jù)、持久化數(shù)據(jù)的過程稱為 shuffle write,而將 reducer 讀入數(shù)據(jù)、aggregate 數(shù)據(jù)的過程稱為 shuffle read。那么在 Spark 中,問題就變?yōu)樵趺丛?job 的邏輯或者物理執(zhí)行圖中加入 shuffle write 和 shuffle read 的處理邏輯?以及兩個(gè)處理邏輯應(yīng)該怎么高效實(shí)現(xiàn)?

Shuffle write

由于不要求數(shù)據(jù)有序,shuffle write 的任務(wù)很簡(jiǎn)單:將數(shù)據(jù) partition 好,并持久化。之所以要持久化,一方面是要減少內(nèi)存存儲(chǔ)空間壓力,另一方面也是為了 fault-tolerance。
shuffle write 的任務(wù)很簡(jiǎn)單,那么實(shí)現(xiàn)也很簡(jiǎn)單:將 shuffle write 的處理邏輯加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,該 stage 的 final RDD 每輸出一個(gè) record 就將其 partition 并持久化。圖示如下:

上圖有 4 個(gè) ShuffleMapTask 要在同一個(gè) worker node 上運(yùn)行,CPU core 數(shù)為 2,可以同時(shí)運(yùn)行兩個(gè) task。每個(gè) task 的執(zhí)行結(jié)果(該 stage 的 finalRDD 中某個(gè) partition 包含的 records)被逐一寫到本地磁盤上。每個(gè) task 包含 R 個(gè)緩沖區(qū),R = reducer 個(gè)數(shù)(也就是下一個(gè) stage 中 task 的個(gè)數(shù)),緩沖區(qū)被稱為 bucket,其大小為spark.shuffle.file.buffer.kb ,默認(rèn)是 32KB(Spark 1.1 版本以前是 100KB)。
其實(shí) bucket 是一個(gè)廣義的概念,代表 ShuffleMapTask 輸出結(jié)果經(jīng)過 partition 后要存放的地方,這里為了細(xì)化數(shù)據(jù)存放位置和數(shù)據(jù)名稱,僅僅用 bucket 表示緩沖區(qū)。
ShuffleMapTask 的執(zhí)行過程很簡(jiǎn)單:先利用 pipeline 計(jì)算得到 finalRDD 中對(duì)應(yīng) partition 的 records。每得到一個(gè) record 就將其送到對(duì)應(yīng)的 bucket 里,具體是哪個(gè) bucket 由partitioner.partition(record.getKey()))決定。每個(gè) bucket 里面的數(shù)據(jù)會(huì)不斷被寫到本地磁盤上,形成一個(gè) ShuffleBlockFile,或者簡(jiǎn)稱 FileSegment。之后的 reducer 會(huì)去 fetch 屬于自己的 FileSegment,進(jìn)入 shuffle read 階段。
這樣的實(shí)現(xiàn)很簡(jiǎn)單,但有幾個(gè)問題:
  1. 產(chǎn)生的 FileSegment 過多。每個(gè) ShuffleMapTask 產(chǎn)生 R(reducer 個(gè)數(shù))個(gè) FileSegment,M 個(gè) ShuffleMapTask 就會(huì)產(chǎn)生 M * R 個(gè)文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會(huì)存在大量的數(shù)據(jù)文件。

  2. 緩沖區(qū)占用內(nèi)存空間大。每個(gè) ShuffleMapTask 需要開 R 個(gè) bucket,M 個(gè) ShuffleMapTask 就會(huì)產(chǎn)生 M R 個(gè) bucket。雖然一個(gè) ShuffleMapTask 結(jié)束后,對(duì)應(yīng)的緩沖區(qū)可以被回收,但一個(gè) worker node 上同時(shí)存在的 bucket 個(gè)數(shù)可以達(dá)到 cores R 個(gè)(一般 worker 同時(shí)可以運(yùn)行 cores 個(gè) ShuffleMapTask),占用的內(nèi)存空間也就達(dá)到了cores * R * 32 KB。對(duì)于 8 核 1000 個(gè) reducer 來(lái)說(shuō),占用內(nèi)存就是 256MB。

目前來(lái)看,第二個(gè)問題還沒有好的方法解決,因?yàn)閷懘疟P終究是要開緩沖區(qū)的,緩沖區(qū)太小會(huì)影響 IO 速度。但第一個(gè)問題有一些方法去解決,下面介紹已經(jīng)在 Spark 里面實(shí)現(xiàn)的 FileConsolidation 方法。先上圖:

可以明顯看出,在一個(gè) core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個(gè)輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個(gè) ShuffleBlock 被稱為 FileSegment。下一個(gè) stage 的 reducer 只需要 fetch 整個(gè) ShuffleFile 就行了。這樣,每個(gè) worker 持有的文件數(shù)降為 cores * R。FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true來(lái)開啟。

Shuffle read

先看一張包含 ShuffleDependency 的物理執(zhí)行圖,來(lái)自 reduceByKey:

很自然地,要計(jì)算 ShuffleRDD 中的數(shù)據(jù),必須先把 MapPartitionsRDD 中的數(shù)據(jù) fetch 過來(lái)。那么問題就來(lái)了:
  • 在什么時(shí)候 fetch,parent stage 中的一個(gè) ShuffleMapTask 執(zhí)行完還是等全部 ShuffleMapTasks 執(zhí)行完?

  • 邊 fetch 邊處理還是一次性 fetch 完再處理?

  • fetch 來(lái)的數(shù)據(jù)存放到哪里?

  • 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?

解決問題:
  • 在什么時(shí)候 fetch?當(dāng) parent stage 的所有 ShuffleMapTasks 結(jié)束后再 fetch。理論上講,一個(gè) ShuffleMapTask 結(jié)束后就可以 fetch,但是為了迎合 stage 的概念(即一個(gè) stage 如果其 parent stages 沒有執(zhí)行完,自己是不能被提交執(zhí)行的),還是選擇全部 ShuffleMapTasks 執(zhí)行完再去 fetch。因?yàn)?fetch 來(lái)的 FileSegments 要先在內(nèi)存做緩沖,所以一次 fetch 的 FileSegments 總大小不能太大。Spark 規(guī)定這個(gè)緩沖界限不能超過 spark.reducer.maxMbInFlight,這里用 softBuffer 表示,默認(rèn)大小為 48MB。一個(gè) softBuffer 里面一般包含多個(gè) FileSegment,但如果某個(gè) FileSegment 特別大的話,這一個(gè)就可以填滿甚至超過 softBuffer 的界限。

  • 邊 fetch 邊處理還是一次性 fetch 完再處理?邊 fetch 邊處理。本質(zhì)上,MapReduce shuffle 階段就是邊 fetch 邊使用 combine() 進(jìn)行處理,只是 combine() 處理的是部分?jǐn)?shù)據(jù)。MapReduce 為了讓進(jìn)入 reduce() 的 records 有序,必須等到全部數(shù)據(jù)都 shuffle-sort 后再開始 reduce()。因?yàn)?Spark 不要求 shuffle 后的數(shù)據(jù)全局有序,因此沒必要等到全部數(shù)據(jù) shuffle 完成后再處理。那么如何實(shí)現(xiàn)邊 shuffle 邊處理,而且流入的 records 是無(wú)序的?答案是使用可以 aggregate 的數(shù)據(jù)結(jié)構(gòu),比如 HashMap。每 shuffle 得到(從緩沖的 FileSegment 中 deserialize 出來(lái))一個(gè) \ record,直接將其放進(jìn) HashMap 里面。如果該 HashMap 已經(jīng)存在相應(yīng)的 Key,那么直接進(jìn)行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value,并將 func 的結(jié)果重新 put(key) 到 HashMap 中去。這個(gè) func 功能上相當(dāng)于 reduce(),但實(shí)際處理數(shù)據(jù)的方式與 MapReduce reduce() 有差別,差別相當(dāng)于下面兩段程序的差別。

      // MapReduce
    reduce(K key, Iterable<V> values) {
    result = process(key, values)
    return result
    }

    // Spark
    reduce(K key, Iterable<V> values) {
    result = null
    for (V value : values)
    result
    = func(result, value)
    return result
    }

    MapReduce 可以在 process 函數(shù)里面可以定義任何數(shù)據(jù)結(jié)構(gòu),也可以將部分或全部的 values 都 cache 后再進(jìn)行處理,非常靈活。而 Spark 中的 func 的輸入?yún)?shù)是固定的,一個(gè)是上一個(gè) record 的處理結(jié)果,另一個(gè)是當(dāng)前讀入的 record,它們經(jīng)過 func 處理后的結(jié)果被下一個(gè) record 處理時(shí)使用。因此一些算法比如求平均數(shù),在 process 里面很好實(shí)現(xiàn),直接sum(values)/values.length,而在 Spark 中 func 可以實(shí)現(xiàn)sum(values),但不好實(shí)現(xiàn)/values.length。更多的 func 將會(huì)在下面的章節(jié)細(xì)致分析。

  • fetch 來(lái)的數(shù)據(jù)存放到哪里?剛 fetch 來(lái)的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。這里我們主要討論處理后的數(shù)據(jù),可以靈活設(shè)置這些數(shù)據(jù)是“只用內(nèi)存”還是“內(nèi)存+磁盤”。如果spark.shuffle.spill = false就只用內(nèi)存。內(nèi)存使用的是AppendOnlyMap ,類似 Java 的HashMap,內(nèi)存+磁盤使用的是ExternalAppendOnlyMap,如果內(nèi)存空間不足時(shí),ExternalAppendOnlyMap可以將 \

     records 進(jìn)行 sort 后 spill 到磁盤上,等到需要它們的時(shí)候再進(jìn)行歸并,后面會(huì)詳解。使用“內(nèi)存+磁盤”的一個(gè)主要問題就是如何在兩者之間取得平衡?在 Hadoop MapReduce 中,默認(rèn)將 reducer 的 70% 的內(nèi)存空間用于存放 shuffle 來(lái)的數(shù)據(jù),等到這個(gè)空間利用率達(dá)到 66% 的時(shí)候就開始 merge-combine()-spill。在 Spark 中,也適用同樣的策略,一旦 ExternalAppendOnlyMap 達(dá)到一個(gè)閾值就開始 spill,具體細(xì)節(jié)下面會(huì)討論。
  • 怎么獲得要 fetch 的數(shù)據(jù)的存放位置?在上一章討論物理執(zhí)行圖中的 stage 劃分的時(shí)候,我們強(qiáng)調(diào) “一個(gè) ShuffleMapStage 形成后,會(huì)將該 stage 最后一個(gè) final RDD 注冊(cè)到 MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),這一步很重要,因?yàn)?shuffle 過程需要 MapOutputTrackerMaster 來(lái)指示 ShuffleMapTask 輸出數(shù)據(jù)的位置”。因此,reducer 在 shuffle 的時(shí)候是要去 driver 里面的 MapOutputTrackerMaster 詢問 ShuffleMapTask 輸出的數(shù)據(jù)位置的。每個(gè) ShuffleMapTask 完成時(shí)會(huì)將 FileSegment 的存儲(chǔ)位置信息匯報(bào)給 MapOutputTrackerMaster。

至此,我們已經(jīng)討論了 shuffle write 和 shuffle read 設(shè)計(jì)的核心思想、算法及某些實(shí)現(xiàn)。接下來(lái),我們深入一些細(xì)節(jié)來(lái)討論。

典型 transformation() 的 shuffle read

1. reduceByKey(func)

上面初步介紹了 reduceByKey() 是如何實(shí)現(xiàn)邊 fetch 邊 reduce() 的。需要注意的是雖然 Example(WordCount) 中給出了各個(gè) RDD 的內(nèi)容,但一個(gè) partition 里面的 records 并不是同時(shí)存在的。比如在 ShuffledRDD 中,每 fetch 來(lái)一個(gè) record 就立即進(jìn)入了 func 進(jìn)行處理。MapPartitionsRDD 中的數(shù)據(jù)是 func 在全部 records 上的處理結(jié)果。從 record 粒度上來(lái)看,reduce() 可以表示如下:

可以看到,fetch 來(lái)的 records 被逐個(gè) aggreagte 到 HashMap 中,等到所有 records 都進(jìn)入 HashMap,就得到最后的處理結(jié)果。唯一要求是 func 必須是 commulative 的(參見上面的 Spark 的 reduce() 的代碼)。
ShuffledRDD 到 MapPartitionsRDD 使用的是 mapPartitionsWithContext 操作。
為了減少數(shù)據(jù)傳輸量,MapReduce 可以在 map 端先進(jìn)行 combine(),其實(shí)在 Spark 也可以實(shí)現(xiàn),只需要將上圖 ShuffledRDD => MapPartitionsRDD 的 mapPartitionsWithContext 在 ShuffleMapStage 中也進(jìn)行一次即可,比如 reduceByKey 例子中 ParallelCollectionRDD => MapPartitionsRDD 完成的就是 map 端的 combine()。
對(duì)比 MapReduce 的 map()-reduce() 和 Spark 中的 reduceByKey():
  • map 端的區(qū)別:map() 沒有區(qū)別。對(duì)于 combine(),MapReduce 先 sort 再 combine(),Spark 直接在 HashMap 上進(jìn)行 combine()。

  • reduce 端區(qū)別:MapReduce 的 shuffle 階段先 fetch 數(shù)據(jù),數(shù)據(jù)量到達(dá)一定規(guī)模后 combine(),再將剩余數(shù)據(jù) merge-sort 后 reduce(),reduce() 非常靈活。Spark 邊 fetch 邊 reduce()(在 HashMap 上執(zhí)行 func),因此要求 func 符合 commulative 的特性。

從內(nèi)存利用上來(lái)對(duì)比:
  • map 端區(qū)別:MapReduce 需要開一個(gè)大型環(huán)形緩沖區(qū)來(lái)暫存和排序 map() 的部分輸出結(jié)果,但 combine() 不需要額外空間(除非用戶自己定義)。Spark 需要 HashMap 內(nèi)存數(shù)據(jù)結(jié)構(gòu)來(lái)進(jìn)行 combine(),同時(shí)輸出 records 到磁盤上時(shí)也需要一個(gè)小的 buffer(bucket)。

  • reduce 端區(qū)別:MapReduce 需要一部分內(nèi)存空間來(lái)存儲(chǔ) shuffle 過來(lái)的數(shù)據(jù),combine() 和 reduce() 不需要額外空間,因?yàn)樗鼈兊妮斎霐?shù)據(jù)分段有序,只需歸并一下就可以得到。在 Spark 中,fetch 時(shí)需要 softBuffer,處理數(shù)據(jù)時(shí)如果只使用內(nèi)存,那么需要 HashMap 來(lái)持有處理后的結(jié)果。如果使用內(nèi)存+磁盤,那么在 HashMap 存放一部分處理后的數(shù)據(jù)。

2. groupByKey(numPartitions)

與 reduceByKey() 流程一樣,只是 func 變成 result = result ++ record.value,功能是將每個(gè) key 對(duì)應(yīng)的所有 values 鏈接在一起。result 來(lái)自 hashMap.get(record.key),計(jì)算后的 result 會(huì)再次被 put 到 hashMap 中。與 reduceByKey() 的區(qū)別就是 groupByKey() 沒有 map 端的 combine()。對(duì)于 groupByKey() 來(lái)說(shuō) map 端的 combine() 只是減少了重復(fù) Key 占用的空間,如果 key 重復(fù)率不高,沒必要 combine(),否則,最好能夠 combine()。

3. distinct(numPartitions)

與 reduceByKey() 流程一樣,只是 func 變成 result = result == null? record.value : result,如果 HashMap 中沒有該 record 就將其放入,否則舍棄。與 reduceByKey() 相同,在map 端存在 combine()。

4. cogroup(otherRDD, numPartitions)

CoGroupedRDD 可能有 0 個(gè)、1 個(gè)或者多個(gè) ShuffleDependency。但并不是要為每一個(gè) ShuffleDependency 建立一個(gè) HashMap,而是所有的 Dependency 共用一個(gè) HashMap。與 reduceByKey() 不同的是,HashMap 在 CoGroupedRDD 的 compute() 中建立,而不是在 mapPartitionsWithContext() 中建立。
粗線表示的 task 首先 new 出一個(gè) Array[ArrayBuffer(), ArrayBuffer()],ArrayBuffer() 的個(gè)數(shù)與參與 cogroup 的 RDD 個(gè)數(shù)相同。func 的邏輯是這樣的:每當(dāng)從 RDD a 中 shuffle 過來(lái)一個(gè) \ record 就將其添加到 hashmap.get(Key) 對(duì)應(yīng)的 Array 中的第一個(gè) ArrayBuffer() 中,每當(dāng)從 RDD b 中 shuffle 過來(lái)一個(gè) record,就將其添加到對(duì)應(yīng)的 Array 中的第二個(gè) ArrayBuffer()。
CoGroupedRDD => MappedValuesRDD 對(duì)應(yīng) mapValues() 操作,就是將 [ArrayBuffer(), ArrayBuffer()] 變成 [Iterable[V], Iterable[W]]。

5. intersection(otherRDD) 和 join(otherRDD, numPartitions)

 這兩個(gè)操作中均使用了 cogroup,所以 shuffle 的處理方式與 cogroup 一樣。

6. sortByKey(ascending, numPartitions)

sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯與 reduceByKey() 不太一樣,沒有使用 HashMap 和 func 來(lái)處理 fetch 過來(lái)的 records。
sortByKey() 中 ShuffledRDD => MapPartitionsRDD 的處理邏輯是:將 shuffle 過來(lái)的一個(gè)個(gè) record 存放到一個(gè) Array 里,然后按照 Key 來(lái)對(duì) Array 中的 records 進(jìn)行 sort。

7. coalesce(numPartitions, shuffle = true)

coalesce() 雖然有 ShuffleDependency,但不需要對(duì) shuffle 過來(lái)的 records 進(jìn)行 aggregate,所以沒有建立 HashMap。每 shuffle 一個(gè) record,就直接流向 CoalescedRDD,進(jìn)而流向 MappedRDD 中。

Shuffle read 中的 HashMap

HashMap 是 Spark shuffle read 過程中頻繁使用的、用于 aggregate 的數(shù)據(jù)結(jié)構(gòu)。Spark 設(shè)計(jì)了兩種:一種是全內(nèi)存的 AppendOnlyMap,另一種是內(nèi)存+磁盤的 ExternalAppendOnlyMap。下面我們來(lái)分析一下兩者特性及內(nèi)存使用情況。

1. AppendOnlyMap

AppendOnlyMap 的官方介紹是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed。意思是類似 HashMap,但沒有remove(key)方法。其實(shí)現(xiàn)原理很簡(jiǎn)單,開一個(gè)大 Object 數(shù)組,藍(lán)色部分存儲(chǔ) Key,白色部分存儲(chǔ) Value。如下圖:

當(dāng)要 put(K, V) 時(shí),先 hash(K) 找存放位置,如果存放位置已經(jīng)被占用,就使用 Quadratic probing 探測(cè)方法來(lái)找下一個(gè)空閑位置。對(duì)于圖中的 K6 來(lái)說(shuō),第三次查找找到 K4 后面的空閑位置,放進(jìn)去即可。get(K6) 的時(shí)候類似,找三次找到 K6,取出緊挨著的 V6,與先來(lái)的 value 做 func,結(jié)果重新放到 V6 的位置。
迭代 AppendOnlyMap 中的元素的時(shí)候,從前到后掃描輸出。
如果 Array 的利用率達(dá)到 70%,那么就擴(kuò)張一倍,并對(duì)所有 key 進(jìn)行 rehash 后,重新排列每個(gè) key 的位置。
AppendOnlyMap 還有一個(gè) destructiveSortedIterator(): Iterator[(K, V)] 方法,可以返回 Array 中排序后的 (K, V) pairs。實(shí)現(xiàn)方法很簡(jiǎn)單:先將所有 (K, V) pairs compact 到 Array 的前端,并使得每個(gè) (K, V) 占一個(gè)位置(原來(lái)占兩個(gè)),之后直接調(diào)用 Array.sort() 排序,不過這樣做會(huì)破壞數(shù)組(key 的位置變化了)。

2. ExternalAppendOnlyMap

相比 AppendOnlyMap,ExternalAppendOnlyMap 的實(shí)現(xiàn)略復(fù)雜,但邏輯其實(shí)很簡(jiǎn)單,類似 Hadoop MapReduce 中的 shuffle-merge-combine-sort 過程:
ExternalAppendOnlyMap 持有一個(gè) AppendOnlyMap,shuffle 來(lái)的一個(gè)個(gè) (K, V) record 先 insert 到 AppendOnlyMap 中,insert 過程與原始的 AppendOnlyMap 一模一樣。如果 AppendOnlyMap 快被裝滿時(shí)檢查一下內(nèi)存剩余空間是否可以夠擴(kuò)展,夠就直接在內(nèi)存中擴(kuò)展,不夠就 sort 一下 AppendOnlyMap,將其內(nèi)部所有 records 都 spill 到磁盤上。圖中 spill 了 4 次,每次 spill 完在磁盤上生成一個(gè) spilledMap 文件,然后重新 new 出來(lái)一個(gè) AppendOnlyMap。最后一個(gè) (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 來(lái)的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已經(jīng)被處理完,因?yàn)槊看?insert 的時(shí)候,新來(lái)的 record 只與 AppendOnlyMap 中的 records 進(jìn)行 aggregate,并不是與所有的 records 進(jìn)行 aggregate(一些 records 已經(jīng)被 spill 到磁盤上了)。因此當(dāng)需要 aggregate 的最終結(jié)果時(shí),需要對(duì) AppendOnlyMap 和所有的 spilledMaps 進(jìn)行全局 merge-aggregate。
全局 merge-aggregate 的流程也很簡(jiǎn)單:先將 AppendOnlyMap 中的 records 進(jìn)行 sort,形成 sortedMap。然后利用 DestructiveSortedIterator 和 DiskMapIterator 分別從 sortedMap 和各個(gè) spilledMap 讀出一部分?jǐn)?shù)據(jù)(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key),所以圖中第一個(gè) spilledMap 只讀出前三個(gè) records 進(jìn)入 StreamBuffer。mergeHeap 顧名思義就是使用堆排序不斷提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并將其一個(gè)個(gè)放入 mergeBuffers 中,放入的時(shí)候與已經(jīng)存在于 mergeBuffers 中的 StreamBuffer 進(jìn)行 merge-combine,第一個(gè)被放入 mergeBuffers 的 StreamBuffer 被稱為 minBuffer,那么 minKey 就是 minBuffer 中第一個(gè) record 的 key。當(dāng) merge-combine 的時(shí)候,與 minKey 相同的 records 被 aggregate 一起,然后輸出。整個(gè) merge-combine 在 mergeBuffers 中結(jié)束后,StreamBuffer 剩余的 records 隨著 StreamBuffer 重新進(jìn)入 mergeHeap。一旦某個(gè) StreamBuffer 在 merge-combine 后變?yōu)榭眨ɡ锩娴?records 都被輸出了),那么會(huì)使用 DestructiveSortedIterator 或 DiskMapIterator 重新裝填 hash(key) 相同的 records,然后再重新進(jìn)入 mergeHeap。
整個(gè) insert-merge-aggregate 的過程有三點(diǎn)需要進(jìn)一步探討一下:
  • 內(nèi)存剩余空間檢測(cè)

    與 Hadoop MapReduce 規(guī)定 reducer 中 70% 的空間可用于 shuffle-sort 類似,Spark 也規(guī)定 executor 中 spark.shuffle.memoryFraction * spark.shuffle.safetyFraction 的空間(默認(rèn)是0.3 * 0.8)可用于 ExternalOnlyAppendMap。Spark 略保守是不是?更保守的是這 24% 的空間不是完全用于一個(gè) ExternalOnlyAppendMap 的,而是由在 executor 上同時(shí)運(yùn)行的所有 reducer 共享的。為此,exectuor 專門持有一個(gè) ShuffleMemroyMap: HashMap[threadId, occupiedMemory] 來(lái)監(jiān)控每個(gè) reducer 中 ExternalOnlyAppendMap 占用的內(nèi)存量。每當(dāng) AppendOnlyMap 要擴(kuò)展時(shí),都會(huì)計(jì)算 ShuffleMemroyMap 持有的所有 reducer 中的 AppendOnlyMap 已占用的內(nèi)存 + 擴(kuò)展后的內(nèi)存 是會(huì)否會(huì)大于內(nèi)存限制,大于就會(huì)將 AppendOnlyMap spill 到磁盤。有一點(diǎn)需要注意的是前 1000 個(gè) records 進(jìn)入 AppendOnlyMap 的時(shí)候不會(huì)啟動(dòng)是否要 spill 的檢查,需要擴(kuò)展時(shí)就直接在內(nèi)存中擴(kuò)展。

  • AppendOnlyMap 大小估計(jì)

    為了獲知 AppendOnlyMap 占用的內(nèi)存空間,可以在每次擴(kuò)展時(shí)都將 AppendOnlyMap reference 的所有 objects 大小都算一遍,然后加和,但這樣做非常耗時(shí)。所以 Spark 設(shè)計(jì)了粗略的估算算法,算法時(shí)間復(fù)雜度是 O(1),核心思想是利用 AppendOnlyMap 中每次 insert-aggregate record 后 result 的大小變化及一共 insert 的 records 的個(gè)數(shù)來(lái)估算大小,具體見 SizeTrackingAppendOnlyMap 和 SizeEstimator

  • Spill 過程

    與 shuffle write 一樣,在 spill records 到磁盤上的時(shí)候,會(huì)建立一個(gè) buffer 緩沖區(qū),大小仍為 spark.shuffle.file.buffer.kb ,默認(rèn)是 32KB。另外,由于 serializer 也會(huì)分配緩沖區(qū)用于序列化和反序列化,所以如果一次 serialize 的 records 過多的話緩沖區(qū)會(huì)變得很大。Spark 限制每次 serialize 的 records 個(gè)數(shù)為 spark.shuffle.spill.batchSize,默認(rèn)是 10000。

Discussion

通過本章的介紹可以發(fā)現(xiàn),相比 MapReduce 固定的 shuffle-combine-merge-reduce 策略,Spark 更加靈活,會(huì)根據(jù)不同的 transformation() 的語(yǔ)義去設(shè)計(jì)不同的 shuffle-aggregate 策略,再加上不同的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來(lái)混搭出合理的執(zhí)行流程。
這章主要討論了 Spark 是怎么在不排序 records 的情況下完成 shuffle write 和 shuffle read,以及怎么將 shuffle 過程融入 RDD computing chain 中的。附帶討論了內(nèi)存與磁盤的平衡以及與 Hadoop MapReduce shuffle 的異同。下一章將從部署圖以及進(jìn)程通信角度來(lái)描述 job 執(zhí)行的整個(gè)流程,也會(huì)涉及 shuffle write 和 shuffle read 中的數(shù)據(jù)位置獲取問題。

Spark的Cache和Checkpoint區(qū)別和聯(lián)系拾遺

Spark Job 邏輯執(zhí)行圖和數(shù)據(jù)依賴解析

Spark Job 物理執(zhí)行圖詳解



歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連


文章不錯(cuò)?點(diǎn)個(gè)【在看】吧! 
瀏覽 52
點(diǎn)贊
評(píng)論
收藏
分享

手機(jī)掃一掃分享

分享
舉報(bào)
評(píng)論
圖片
表情
推薦
點(diǎn)贊
評(píng)論
收藏
分享

手機(jī)掃一掃分享

分享
舉報(bào)

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 日韩欧美在线中文字幕| 在线观看日本vs欧洲vs美洲| 色五月在线观看| 欧美特级黄| 大香蕉网站在线观看| 亚州成熟少妇视频在线观看| 一曲二曲三曲在线观看中文字| 黄色三级av| 午夜成人无码视频| 少妇高潮av久久久久久| 亚洲激情综合| 欧美老熟妇乱大交XXXXX| 婷婷99狠狠躁天天| 风流老熟女一区二区三区| 综合久久久久| caobi999| 无码高清在线播放| 午夜影音| 亚洲无码成人视频| 亲子伦视频一区二区三区| 四虎综合网| 夜夜夜夜骑| 做爱激情视频网站| 噜噜视频| 69AV在线视频| 欧美成人午夜| 久久久久久97| 搡老熟女-91Porn| 老司机精品| 日本欧美成人片AAAA| 日本欧美中文| 色男人天堂| 国产精品一区二区在线| 欧美囗交荫蒂AAAA| 北条麻妃无码一区二区| 亚洲男人的天堂视频网在线观看+720P | 在线婷婷| 成人免费视频在线| 国产丰满大乳无码免费播放| 8090操逼网| 搞搞网日本9| 日韩视频在线免费观看| 五月丁香综合激情| 丰满少妇一区二区三区| 18禁黄色免费网站| 影音先锋av无码| 欧美久久久久久久| av三级网站| 人妻免费在线视频| 老司机在线免费视频| 韩国中文字幕HD久久| 亚洲中文在线播放| www.水蜜桃| 久久国产精品久久| 影音先锋国产精品| 久操视频网站| 手机看片1024国产| 一本无码高清| 亚洲有码中文字幕| 一区视频免费观看| 亚洲精品在线视频观看| 中文字幕乱伦视频| 91亚洲精华国产精华精华液| 日本免费一二三区| 一道本视频在线| 中文字幕视频一区日日骚| 国产精品123| 亚洲精品黄色| 国产有码视频| 久久无码一区| 日逼A片| 91亚洲国产AⅤ精品一区二区| 艹逼免费视频| 永久免费无码中文字幕| 亚洲综合中文字幕在线| 七十路の高齢熟女千代子下载| 久久视频在线| 中文字幕日本| 免费一区视频| 婷婷三级| 国产亚洲精品午夜福利巨大软件 | 日韩精品毛片一区二区视频免费 | 性爱av在线| 欧美丰满美乳XXⅩ高潮www| 欧美一级在线免费观看| 大鸡巴免费视频| www.97色| 蜜桃操逼| 无码人妻丰满熟妇区蜜桃| 一级a看片在线观看| 51妺嘿嘿在线电影免费观看 | 精品视频一区二区三区四区| 久热中文字幕| 亚洲无码影视| 国产高清在线| 俺来也操逼| 亚洲无码播放| 欧美无人区码suv| 麻豆成人91精品二区三区| 国产色视频| 亚洲欧美精品| 狠狠躁日日躁夜夜躁A片无码视频 强伦轩一区二区三区四区播放方式 | 五月天丁香社区| 青青草在线视频免费观看| 日韩一级片视频| 黄色AA片| 美日韩毛片| 大鸡巴在线| 国产成人精品在线观看| 99热这里只有精品9| 亚洲免费观看视频| 日韩一级黄色电影| 99精品在线| 欧美色成人免费在线视频| 伊人久久福利视频| 国产一级操逼| 久久精品v| 欧美一区电影| 一级a黄片| 东京热一级片| 你懂的在线免费观看| av黄色网址| 中文无码影院| 在线视频一区二区| 国产日韩一区二区三区| 曰逼视频| 91视频专区| 日本不卡在线视频| 中文字幕亚洲区| 国产精品精品| 亚洲无码视频免费看| 91人人澡人人爽人人看| 国产一区二区三区视频| 成人av黄色三级片在线观看| 瘦精品无码一区二区三区四区五区六区七区八区 | 国产日韩欧美91| 日韩久久婷婷| 成人图片小说| 无码视频在线看| 久久久久一| 日韩AV综合| 日韩欧美在线中文字幕| 福利在线| 中国九九盗摄偷拍偷看| 久操视频免费在线观看| 一本道视频在线| 国产91在线亚洲| 乱子伦】国产精品| 婷婷五月天激情视频| 日韩一级片在线观看| 亚洲草片| 一道本视频在线| 无码中文综合成熟精品AV电影 | 国产女人18毛片水18精品软件| 日韩熟妇人妻中文字幕| 波多野结衣视频免费在线观看| 超碰97老师| 色色一区二区| 成年人免费视频网站| 日本精品在线| 亚洲午夜福利在线观看| 国产成人精品av在线观看| 亚洲免费观看高清完整版在线观| 天天日天天搞| 做爱无码| 黑人巨粗进入疼哭A片| 无码一道本一区二区无码| 青青草在线视频免费观看| 伊人免费视频| 日韩精品123| 熟女探花| 伊人春色AV| 精品视频中文字幕| 91色色色色| 色吧久久| 午夜久久久久久久久久久久91| 成人激情在线观看| 久久免费观看视频| 中文字幕观看| 澳门午夜黄色在线| 国产AV剧情| 国产高清成人| 伊人综合网站| 久草这里只有精品| 午夜福利不卡视频| 北条麻妃二区三区| 欧美日韩日逼视频| 91色秘乱码一区二区| 欧洲操逼视频| 黄色视频网站在线| 少妇性视频| 黄色视频在线观看大全| 女人18片毛片90分钟免费明星| 日韩不卡一区| AV在线免费观看网站| 免费看国产黄色视频| 国内成人精品| 麻豆精东一区二区欧美国产| 在线免费观看av片| 日本电影一区二区| 色中色av| 免费十无码| 日本特级黄A片免费观看| 久久大鸡吧| 国产精品免费av在线| 成人毛片在线大全免费| 69成人在线电影| 国内精品久久久久久久| 日韩无码观看| 亚洲中文字幕久久日| 亚洲AV片一区二区三区| 免费看的操逼视频| 四虎成人精品无码永久在线的客服 | 日韩中文字幕熟妇人妻| 尤物视频网站在线观看| 无码国产精品一区二区视频| 男女啪啪免费视频| 熟妇人妻中文字幕无码老熟妇| 三洞齐开Av在线免费观看| 婷婷在线观看免费| 免费内射网站| 国产精品偷拍| 国产午夜福利电影| 五月丁香婷婷开心| 一插菊花综合视频| 少妇搡BBBB搡BBB搡毛片少妇| 国产精品久久久久无码| 久久国内视频| 国产一毛a一毛a在线观看| 日韩成人免费在线观看| 高清无码不卡视频| 中文在线字幕免费观| 俺来俺去| 亚洲影院中文字幕| 韩国gogogo高清在线完整版| 神马午夜秋霞不卡| 最好看的MV中文字幕国语电影| 色av影音先锋无吗一区| 欧美日屄视频| 人妻无码高清| 欧美一卡二卡| 色婷婷欧美在线播放内射| 中国熟女HD| 91丨熟女丨首页| 亚洲AV无码成人精品区天堂小说 | 日韩高清无码一区| 西西444WWW无码视频软件| 91成人小视频| 成人三级片在线播放| 亚洲无码门| 国产操逼网址| 国产一级二级视频| 中文字幕视频在线| 夜色福利在线看| 日韩一区不卡| 成人自拍视频在线观看| 青青草精品视频| 极品美女援交在线| 国内精品久久久久久久久98| 怡红院男人天堂| 影视先锋久久| 狼人狠干| av无码精品一区| 日韩精品电影| 97男人的天堂| 操逼视频高清无码| 亚洲欧美性爱视频| 开心色婷婷| 无码电影在线播放| 91天堂网| 香蕉漫画在线观看18| 日日舔| 日韩成人无码人妻| 精品蜜桃一区内容| 北条麻妃三区| 伊人久久电影| 99久久黄色| 日韩毛片在线看| 嫩BBB搡BBB槡BBB小号| 骚逼中文字幕| 色婷婷在线综合| 尤物视频网| www.av免费| 精品女人| www.大鸡巴| 97精产国品久久蜜桃臀| 欧美成人内射| 欧美操逼图| 中文字幕在线视频日本| 欧美人妻精品| 中文字幕在线有码| 北条麻妃在线播放一区| 国产丝袜人妖TS系列| 国产精品婷婷| 51妺妺嘿嘿午夜成人| 午夜三级无码| 一卡二卡无码| 1024国产在线| 三级片视频在线观看| 无码视频一区二区| 在线观看亚洲无码视频| 牛牛免费视频| 狼友视频报放| 五十路老国产| 日本成人中文字幕| 久久综合伊人7777777| 京熱大亂交无碼大亂交| 男女啪啪网站| 国产三级无码视频| 成人做爰A片一区二区app| 囯产一级黄片| 中文字幕不卡无码| 一级大黄色毛片| 91探花在线播放| 国产精品操逼视频| 操b视频免费| 国产成人视频免费观看| 竹菊影视一区二区三区| 偷拍-91爱爱| 夜夜操夜夜操| 免费涩涩无遮挡18国产| 伊人9999| 亚洲中文字幕无码在线观看| 亚洲欧美日本在线观看| 99成人乱码一区二区三区在线 | 中文字幕88页| 亚洲a√| 精品无码一区二区| 日韩中文字幕精品| 精品视频免费观看| 欧美18禁黄免费网站| 久久怡春院| 国产欧美日韩在线| 俺也要操| 人人干人人澡| 国产精品在线免费| 无码人妻精品一区二区三千菊电影| 国产骚逼视频| 久在线视频| 成人夜间视频| 九九性爱网| 国产口爆| 国产一区在线看| 国产免费自拍| 欧美性猛交XXXXⅩXX| 国产91在线播放| 影音先锋黄色资源| 午夜看黄片| 日逼高清视频| 91豆花成人网站| 人妖黃色一級A片| 黄色网址在线免费观看| 欧美99在线| 你懂的网址在线观看| 免费一级黄色毛片| 99re免费视频| 亚州一级成人片| 91九色91蝌蚪91成人| 北条麻妃久久视频在线播放| 久热在线精品视频| 91夫妻交友视频| 国产精品黄视频| 91青青草视频| 无码孕妇| 大香蕉在线网| 中文字幕日本| 国产视频一区二区三区四区五区 | 日本黄色片在线播放| 日韩AV高清| 黄色爱爱视频| 91丨九色丨东北熟女| 亚洲爱爱网站| 久久久性爱视频| 91网站在线观看视频| 91人人妻| 中文字幕精品一区| 99热精品在线观看| 亚洲免费观看高清完整版在线观| 操碧一区| 北条麻妃在线中文字幕| 国产精品一区网站| 久久久久久高清毛片一级| 激情无码av| 中文字幕精品在线观看| 无码中文综合成熟精品AV电影 | 欧美V| 黄色A片网址| 国内免费av| 午夜无码福利在线观看| 强伦轩农村人妻| av六月天| 国产一区二区三区在线| 黄A网站| 91人妻人人澡人人爽人人精吕| 99高清无码| 日日Av| 啪啪国产| 国产第七页| 欧美五月激情| 中文字幕永久在线视频v1.0| 性爱二区| 午夜欧美性爱视频| 伊人黄色| 久久精品91| av无码高清| 操逼毛片视频| 日韩东京热中文字幕| 成人午夜av| 熟女资源站| 一道本一区二区| 国产在线拍揄自揄拍无码男男| 99在线免费观看| 大香蕉在线网站| 午夜亚洲国产一区视频网站| 国模一区二区| 国产日韩欧美久久| 国产女人18毛片18精品| 真人BBwBBWBBw另类视频| AAAA毛片视频| 再深点灬好爽灬轻点久久国产| 久久天堂一区| 在线观看黄色电影| 日韩在线免费播放| 午夜精品18码视频国产17c| 日韩小视频在线| 国产3p露脸普通话对白| 日日碰狠狠添| 四虎成人无码A片观看| 精品AV无码一区二区三区| 四季AV综合网站| 18禁网站在线播放| 丁香五月在线观看| 亚洲成色A片77777在线小说 | 五月丁香六月激情综合| 黄色操逼网站?| 91麻豆精品国产| 黄色片一级片| 日韩国产一区| 亚洲秘无码一区二区| 国产精品操逼视频| 老熟妇搡BBBB搡BBBB| 欧美成人福利视频| 99婷婷| AV无码免费一区二区三区不卡| 国产传媒三级| 在线观看视频你懂的| 成人做爰100片免费-百度| 欧美操逼的| 91社区成人影院| 欧美日日| 亚欧成人| 精品九九九九九| 国产三级在线观看| 亚洲av播放| 国产视频123区| 国产欧美一区二区精品性色超碰| 不卡不在线中文| 伊人网导航| 大香蕉网址| 一区二区日本| 色色色色AV| 婷婷五月天在线观看| 无码人妻精品一区二区三区蜜桃91| 中文字幕成人免费视频| av无码aV天天aV天天爽| 日本综合视频| 国产人成视频| 青青在线| 18毛片| 丁香六月婷婷综合缴| 狼人伊人综合| 精品99视频| 毛茸茸BBBBBB毛茸茸| 久久亚洲Aⅴ成人无码国产丝袜| 成人做爰黄A片免费看直播室动漫| 囯产一级a一级a免费视频| 日本一区二区网站| 亚洲精品高清无码| 无码欧美成人| 免费高清无码在线| 黄色国产| 亚洲图片在线观看| 日本免费版网站nba| 黄色电影免费网站| 久久xx| 黄片视频免费播放| 91AV久久| 黄色日逼视频| 亚洲欧洲成人在线| 这里只有精品视频在线| 肏逼在线观看| 国产精品无码怀孕软件| 黄色免费在线观看| 息子交尾一区二区三区| 一级大黄色毛片| 欧美熟妇精品一级A片视色| 丝袜一区二区三区| 亚洲日韩在线视频播放| 亚洲五月婷婷| 日精品| 好吊顶亚洲AV大香蕉色色| 天天撸天天色| 久久三级电影| 亚洲电影中文字幕| 中文字幕成人网站| 亚洲国产精| 日韩另类| 欧美激情亚洲| 国产精品一区二区三区在线| 亚洲国产成人精品女人久久 | 久久福利社| 18禁看网站| 黄片高清免费观看| 91在线无码| 91成人国产| 国产亲子乱婬一级A片借种| 国产乱伦精品视频| 美女被操网站| 麻豆AV在线观看| 午夜AV在线播放| 欧美操B| 综合在线视频| 不卡的一区二区| 久久久国产精品在线| 亚洲无码在线播放| 色综合天天综合成人网| 国产成人亚洲精品| 小早川怜子精品一区二区| 人妻无码中文字幕免费视频蜜桃| 一级AA毛片| 国产精品乱子伦| 无码国产99精品久久久久网站| 日韩大片免费观看| 一级片日韩| 视频一区二区三区在线观看| 天天躁天干天干| 精品久草| 中文字幕淫乱视频欧美| 强伦轩农村人妻| 大鸡吧网| AV黄页| 大香蕉色视频| 成人国产精品秘欧美高清| 欧美性爱网址| 欧美黑吊大战白妞| jizzjizz欧美| 在线欧美日韩| 三级AV在线观看| 欧美aa片| 色婷婷视频在线观看| 人人爽人人澡| 日本高清无码在线观看| 色黄网站在线观看| 欧美在线A片| 日韩免费在线观看一区入口| 日韩高清在线| AV三级片在线观看| 亚洲高清无码免费观看| 91在线永久| 91精品国产乱码久久| 亚洲欧美日韩性爱| 亚洲天堂影院| 影音先锋AV啪啪资源| 中文字幕第315页| 国产一级a一片成人AV| 手机看片1024你懂的| 大地8免费高清视频观看大全 | 三级片在线观看视频| 91人人妻人人澡人人爽| 欧美成人免费A级在线观看| 啊啊啊啊啊在线观看| aaa国产精品| 色婷婷久久久久swag精品| 男人天堂影院| 日韩在观看线| 日本爽妇网| 人人鲁人人操| 夜色88V精品国产亚洲| 欧美综合区| 国产亚洲99久久精品| 亚洲一区| 丰滿老婦BBwBBwBBw| 亚洲无码av在线播放| 超碰久操| 国产内射精品| 国产h在线播放| 国产suv精品一区二区6精华液| 日本一区二区三区免费看| 亚洲精品一区二区三区在线观看| 午夜九九| 国产淫语| 国产视频一区二区三区四区五区 | 免费高清无码在线观看| 97自拍| 日韩无码性爱视频| 成人在线三级片| 久久精品女同亚洲女同13| 婷婷电影网| 91人兽| 色天堂色天使| 久久久性爱| 无码一道本| 特级西西444www高清| 亚洲精品人伦一区二区| 日韩a在线| 亚洲a√| 日本爱爱网址| 亚洲AV无码国产精品久久不卡| 中文字幕在线无码视频| 激情婷婷色五月| 日韩欧美国产黄色电影| 午夜福利啪啪啪| 国产日逼视频| 丁香五月在线观看| 欧美亚洲一区| 无码婷婷| 亚洲人妻电影| 丁香五月影院| 亚洲无码字幕| av在线免费观看网站| 刘玥一区二区三区| 国产91视频在线观看| 多啪啪免费视频| 精品a片| 熟女人妻在线视频| 婷婷国产| 国产婷婷五月| 国产婷婷内射| 大香蕉伊人成人| 亚洲va综合va国产va中文| 韩国GOGOGO高清| 中文无码在线| 五月丁香六月| 亚洲无码电影在线| 一道本在线视频| aⅴ视频| 免费成人毛片| 亚洲久久久久| 国产人人看| 亚洲免费一区二区| 91中文字幕+乱码| 99热黄| 91女人18片女毛片60分钟| 亚洲中文字幕一| 色婷婷日韩精品一区二区三区| 北条麻妃被躁57分钟视频在线| 色婷婷一区二区三区久久| AV网站在线免费观看| 91亚洲精品在线观看| 日本欧美在线视频| 一级性爽AV毛片| AV天天干| 91丨PORNY丨对白| 国产手机精品视频| 操操操综合网| 国产l精品久久久久久久久久 | 一级毛AA片| 日韩av中文在线| 精品无码久久久久久久久app| 91逼逼| 国产一级a毛一级a做免费的视频l 精品国产免费观看久久久_久久天天 | 操屄影院| 国产乱视频| 超碰首页| 日韩无码播放| 一区二区三区毛片| 午夜国产在线| 成人777| 国产一级内射| 51无码| 亚洲无码视频在线| 国产精品视频一区二区三| 亚洲综合社区在线| 午夜福利啪啪啪| 黄片Av| 熟女啪啪| 免费+无码+精品| 香蕉成人网站在线观看| 婷婷国产| 亚洲无码在线精品| 91欧美精品成人综合在线观看| 99国产免费| 五月丁香电影| 一本无码中文字幕| 伊人国产视频| 91久久亚洲| 欧美疯狂做受XXXXX高潮| 九色国产视频| 久草热在线| 国产精品TV| 男人先锋| 91一区| 日本AI高清无码在线观看网址| 中文字幕永久在线5| 伊人久久AV诱惑悠悠| 中文字幕一区二区三区人妻在线视频 | 粉嫩一区二区三区四区| 影音先锋女人资源| 欧美V亚洲| 国产精品一级a毛视频| 国产三级精品三级在线观看| 久久视频一区| 超碰操| 99热网| 色哟哟国产| 国产亚洲欧美在线| 黄色免费网站在线观看| 久久国产精品视频| 日韩黄色免费视频| 日韩大吊| 久久久WWW成人免费无遮挡大片| 国产精品资源在线观看| 51妺嘿嘿在线电影免费观看 | 精品少妇视频| 欧美久色| 日韩无码福利| 影音先锋成人AV资源| 国产com| 色婷婷成人| 精品乱子伦一区二区三区免费播放| 久久久久久精| 一级成人片| 亚洲三级视频| 日本三级AAA三级AAAA97| 热久久视频| 亚洲最新AV网站| 国产精品天天| 综合婷婷久久| 国内自拍一区| 99精品视频在线播放免费| 三级片视频网站| 国产精品剧情| 欧美日韩一区二区三区四区五区六区 | 自拍第一页| 日韩AV手机在线观看| 4438黄色| 亚洲电影AV| 波多野结衣在线网站| 操美女逼逼| 久草天堂| 欧美屄视频| 欧美va视频| 三级在线网| 色情一级AA片免费观看| 爱搞搞网| 国产精品一二区| 国产精品视频| 18禁裸体美女| 婷婷日韩在线| 国产成人自拍偷拍视频| 狠狠干在线观看| 精品国产一区二区三区久久久蜜月 | 免费播放黄色成人片| 人人做人人做人人做,人人做全句下一| 思思热在线观看视频| 91精品人妻一区二| 99久久九九| 天天狠天天干| 日韩美女性爱| 大伊香蕉久久| 人妻av中文字幕| 国产免费麻豆| 日韩毛片在线| 天天搞天天色| 日韩人成| 欧美sesese| 欧美视频第一页| 国产黄色在线视频| 无码啪啪| 亚洲天堂在线免费观看视频| 日韩成人免费观看| 91精品久久久久久久久| 欧美精品在线播放| 久久这里有精品| 在线免费高清无码| 成人先锋影音| 欧一美一色一伦一A片| 国产A视频| 另类老妇奶性BBWBBw| 91视频熟女| 日本一级特黄大片AAAAA级 | 国产一区二区AV| 日韩gay| 永久免费黄色视频网站| 美日韩无码视频| 97大香蕉视频| 黄色A片网站| 一本一道久久a久久精品综合| 嫩草视频在线播放| 91久久人澡人妻人人做人人爽97| 成人免费视频国产免费麻豆,| 91中文字幕在线| 日韩另类| 国产第七页| 日韩爱爱免费视频| 三级片无码在线播放| 国产福利视频| 精品国产123| 国产手机拍视频推荐2023| AV黄色在线观看| 中文字幕成人电影| 波多野结衣无码NET,AV| 亚洲AV秘无码不卡在线观看| 国产AV无码成人精品区| 一区在线播放| 长腿女神打扫偷懒被主人猛操惩罚| 精品一区二区三区四区学生| 中文资源在线a中文| av久草| 中文字幕av高清片,中文在线观看 www一个人免费观看视频www | 五月婷婷色欲| 日本免费版网站nba| 男人天堂视频在线观看| 婷婷五月在线视频| 夜夜嗨av无码一区二区三区| 国产成人精品久久二区二区91 | 国产精品成人无码免费| 亚洲一级免费在线观看| 日本中文字幕在线观看视频| 99精品热视频| 中文一级片| 欧美日本一区二区三区| 综合色色婷婷| 中文字幕99| 免费一级A片| 日韩欧美中文字幕视频| 国产精品免费观看久久久久久久久 | 在线观看中文字幕一区| 日韩欧美操逼视频| 大香蕉中文| 亚洲三级无码视频| 97人人干人人| 天天看天天日| 日本天堂Tv视频在线观看| 人人舔人人爱| 亚洲不卡在线观看| 你懂的视频在线观看| 中文成人在线| 亚洲国产成人一区二区| 国产色视频在线| 欧美一区| 四虎成人精品永久免费AV九九| 国产69页| 8050网午夜| 无码人妻精品一区二区三区温州| 大香蕉伊人综合网| 无码福利电影| 91人妻人人澡人人爽人人精吕| 久9精品| 天天爽夜夜爽人人爽| 中文字幕免费| 超碰人人人人人人人人| 免费播放婬乱男女婬视频国产| 国产色视频| 久久久久久免费一级A片| 91国内偷拍| 日韩AV在线电影| 国产欧美日韩综合精品| 蜜臀久久99精品久久久久酒店更新时间| 亚洲精品a| 青娱乐亚洲精品视频| 日本内射在线播放| 国产毛片在线看| 久久久18禁一区二区三区精品| 九色PORNY国产成人| 日韩高清无码电影| 蜜桃影视| 免费Av在线| 青青草无码| 日产久久久久久| 91久久精品无码一区| 911亚洲精品| 久久国产精品在线| 欧美操b视频| 亚洲国产高清视频| 日韩无码一区二区三| 91国产在线播放| 大香蕉精品在线| 思思热免费视频| 五月天激情性爱| 91大铭哥| 黄频视频| 中文字幕熟女人妻| 国产成人精品无码片区在线观91| 亚洲av大全| 华女与黑人91A∨| 欧美黄色操逼| 国产精品中文| 久久ww| 午夜久久福利| 超碰在线天天干| 亚洲中文字幕av天堂| 国产精品无码激情| 一级婬片A片AAAA毛片A级| 中文字幕成人av| 黄色免费观看网站| 国产福利视频在线| 国产白浆一区二区三区| 91网站免费| 中文无码人妻少妇|