国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

Spark數(shù)據(jù)傾斜問(wèn)題解決方案全面總結(jié)

共 18779字,需瀏覽 38分鐘

 ·

2021-09-30 22:50

摘要

本文結(jié)合實(shí)例詳細(xì)闡明了Spark數(shù)據(jù)傾斜的幾種場(chǎng)景以及對(duì)應(yīng)的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義Partitioner,使用Map側(cè)Join代替Reduce側(cè)Join,給傾斜Key加上隨機(jī)前綴等。

為何要處理數(shù)據(jù)傾斜(Data Skew)

什么是數(shù)據(jù)傾斜

對(duì)Spark/Hadoop這樣的大數(shù)據(jù)系統(tǒng)來(lái)講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。

何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。

對(duì)于分布式系統(tǒng)而言,理想情況下,隨著系統(tǒng)規(guī)模(節(jié)點(diǎn)數(shù)量)的增加,應(yīng)用整體耗時(shí)線性下降。如果一臺(tái)機(jī)器處理一批大量數(shù)據(jù)需要120分鐘,當(dāng)機(jī)器數(shù)量增加到三時(shí),理想的耗時(shí)為120 / 3 = 40分鐘,如下圖所示

但是,上述情況只是理想情況,實(shí)際上將單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,會(huì)有overhead,使得總的任務(wù)量較之單機(jī)時(shí)有所增加,所以每臺(tái)機(jī)器的執(zhí)行時(shí)間加起來(lái)比單臺(tái)機(jī)器時(shí)更大。這里暫不考慮這些overhead,假設(shè)單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,總?cè)蝿?wù)量不變?! ?但即使如此,想做到分布式情況下每臺(tái)機(jī)器執(zhí)行時(shí)間是單機(jī)時(shí)的1 / N,就必須保證每臺(tái)機(jī)器的任務(wù)量相等。不幸的是,很多時(shí)候,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個(gè)別機(jī)器上,其它大部分機(jī)器所分配的任務(wù)量只占總得的小部分。比如一臺(tái)機(jī)器負(fù)責(zé)處理80%的任務(wù),另外兩臺(tái)機(jī)器各處理10%的任務(wù),如下圖所示

在上圖中,機(jī)器數(shù)據(jù)增加為三倍,但執(zhí)行時(shí)間只降為原來(lái)的80%,遠(yuǎn)低于理想值?! ?/span>

數(shù)據(jù)傾斜的危害

從上圖可見(jiàn),當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過(guò)大,未能充分發(fā)揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢(shì)?! ?另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時(shí),部分任務(wù)處理的數(shù)據(jù)量過(guò)大,可能造成內(nèi)存不足使得任務(wù)失敗,并進(jìn)而引進(jìn)整個(gè)應(yīng)用失敗。  

數(shù)據(jù)傾斜是如何造成的

在Spark中,同一個(gè)Stage的不同Partition可以并行處理,而具有依賴關(guān)系的不同Stage之間是串行處理的。假設(shè)某個(gè)Spark Job分為Stage 0和Stage 1兩個(gè)Stage,且Stage 1依賴于Stage 0,那Stage 0完全處理結(jié)束之前不會(huì)處理Stage 1。而Stage 0可能包含N個(gè)Task,這N個(gè)Task可以并行進(jìn)行。如果其中N-1個(gè)Task都在10秒內(nèi)完成,而另外一個(gè)Task卻耗時(shí)1分鐘,那該Stage的總時(shí)間至少為1分鐘。換句話說(shuō),一個(gè)Stage所耗費(fèi)的時(shí)間,主要由最慢的那個(gè)Task決定。

由于同一個(gè)Stage內(nèi)的所有Task執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同Task之間耗時(shí)的差異主要由該Task所處理的數(shù)據(jù)量決定。

Stage的數(shù)據(jù)來(lái)源主要分為如下兩類

  • 從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka

  • 讀取上一個(gè)Stage的Shuffle數(shù)據(jù)

如何緩解/消除數(shù)據(jù)傾斜

避免數(shù)據(jù)源的數(shù)據(jù)傾斜 ———— 讀Kafka

以Spark Stream通過(guò)DirectStream方式讀取Kafka數(shù)據(jù)為例。由于Kafka的每一個(gè)Partition對(duì)應(yīng)Spark的一個(gè)Task(Partition),所以Kafka內(nèi)相關(guān)Topic的各Partition之間數(shù)據(jù)是否平衡,直接決定Spark處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。

如《Kafka設(shè)計(jì)解析(一)- Kafka背景及架構(gòu)介紹》一文所述,Kafka某一Topic內(nèi)消息在不同Partition之間的分布,主要由Producer端所使用的Partition實(shí)現(xiàn)類決定。如果使用隨機(jī)Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè)Partition中,從而從概率上來(lái)講,各Partition間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源Stage(直接讀取Kafka數(shù)據(jù)的Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。

但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù)放于同一個(gè)Partition中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè)Partition中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過(guò)其它方式處理。

避免數(shù)據(jù)源的數(shù)據(jù)傾斜 ———— 讀文件

原理

Spark以通過(guò)textFile(path, minPartitions)方法讀取文件時(shí),使用TextFileFormat。

對(duì)于不可切分的文件,每個(gè)文件對(duì)應(yīng)一個(gè)Split從而對(duì)應(yīng)一個(gè)Partition。此時(shí)各文件大小是否一致,很大程度上決定了是否存在數(shù)據(jù)源側(cè)的數(shù)據(jù)傾斜。另外,對(duì)于不可切分的壓縮文件,即使壓縮后的文件大小一致,它所包含的實(shí)際數(shù)據(jù)量也可能差別很多,因?yàn)樵次募?shù)據(jù)重復(fù)度越高,壓縮比越高。反過(guò)來(lái),即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數(shù)據(jù)量差距也可能很大。

此時(shí)可通過(guò)在數(shù)據(jù)生成端將不可切分文件存儲(chǔ)為可切分文件,或者保證各文件包含數(shù)據(jù)量相同的方式避免數(shù)據(jù)傾斜。

對(duì)于可切分的文件,每個(gè)Split大小由如下算法決定。其中g(shù)oalSize等于所有文件總大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小決定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size決定。

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

默認(rèn)情況下各Split的大小不會(huì)太大,一般相當(dāng)于一個(gè)Block大?。ㄔ贖adoop 2中,默認(rèn)值為128MB),所以數(shù)據(jù)傾斜問(wèn)題不明顯。如果出現(xiàn)了嚴(yán)重的數(shù)據(jù)傾斜,可通過(guò)上述參數(shù)調(diào)整。

案例

現(xiàn)通過(guò)腳本生成一些文本文件,并通過(guò)如下代碼進(jìn)行簡(jiǎn)單的單詞計(jì)數(shù)。為避免Shuffle,只計(jì)單詞總個(gè)數(shù),不須對(duì)單詞進(jìn)行分組計(jì)數(shù)。

SparkConf sparkConf = new SparkConf()
.setAppName("ReadFileSkewDemo");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
long count = javaSparkContext.textFile(inputFile, minPartitions)
.flatMap((String line) -> Arrays.asList(line.split(" ")).iterator()).count();
System.out.printf("total words : %s", count);
javaSparkContext.stop();

總共生成如下11個(gè)csv文件,其中10個(gè)大小均為271.9MB,另外一個(gè)大小為8.5GB。

之后將8.5GB大小的文件使用gzip壓縮,壓縮后大小僅為25.3MB。

使用如上代碼對(duì)未壓縮文件夾進(jìn)行單詞計(jì)數(shù)操作。Split大小為 max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 10+8.5 1024) / 1 MB, 128 MB) = 128MB。無(wú)明顯數(shù)據(jù)傾斜。

使用同樣代碼對(duì)包含壓縮文件的文件夾進(jìn)行同樣的單詞計(jì)數(shù)操作。未壓縮文件的Split大小仍然為128MB,而壓縮文件(gzip壓縮)由于不可切分,且大小僅為25.3MB,因此該文件作為一個(gè)單獨(dú)的Split/Partition。雖然該文件相對(duì)較小,但是它由8.5GB文件壓縮而來(lái),包含數(shù)據(jù)量是其它未壓縮文件的32倍,因此處理該Split/Partition/文件的Task耗時(shí)為4.4分鐘,遠(yuǎn)高于其它Task的10秒。

由于上述gzip壓縮文件大小為25.3MB,小于128MB的Split大小,不能證明gzip壓縮文件不可切分?,F(xiàn)將minPartitions從默認(rèn)的1設(shè)置為229,從而目標(biāo)Split大小為max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 * 10+25.3) / 229 MB, 128 MB) = 12 MB。如果gzip壓縮文件可切分,則所有Split/Partition大小都不會(huì)遠(yuǎn)大于12。反之,如果仍然存在25.3MB的Partition,則說(shuō)明gzip壓縮文件確實(shí)不可切分,在生成不可切分文件時(shí)需要如上文所述保證各文件數(shù)量大大致相同。

如下圖所示,gzip壓縮文件對(duì)應(yīng)的Split/Partition大小為25.3MB,其它Split大小均為12MB左右。而該Task耗時(shí)4.7分鐘,遠(yuǎn)大于其它Task的4秒。

總結(jié)

適用場(chǎng)景 數(shù)據(jù)源側(cè)存在不可切分文件,且文件內(nèi)包含的數(shù)據(jù)量相差較大。

解決方案 盡量使用可切分的格式代替不可切分的格式,或者保證各文件實(shí)際包含數(shù)據(jù)量大致相同。

優(yōu)勢(shì) 可撤底消除數(shù)據(jù)源側(cè)數(shù)據(jù)傾斜,效果顯著。

劣勢(shì) 數(shù)據(jù)源一般來(lái)源于外部系統(tǒng),需要外部系統(tǒng)的支持。

調(diào)整并行度分散同一個(gè)Task的不同Key

原理

Spark在做Shuffle時(shí),默認(rèn)使用HashPartitioner(非Hash Shuffle)對(duì)數(shù)據(jù)進(jìn)行分區(qū)。如果并行度設(shè)置的不合適,可能造成大量不相同的Key對(duì)應(yīng)的數(shù)據(jù)被分配到了同一個(gè)Task上,造成該Task所處理的數(shù)據(jù)遠(yuǎn)大于其它Task,從而造成數(shù)據(jù)傾斜。

如果調(diào)整Shuffle時(shí)的并行度,使得原本被分配到同一Task的不同Key發(fā)配到不同Task上處理,則可降低原Task所需處理的數(shù)據(jù)量,從而緩解數(shù)據(jù)傾斜問(wèn)題造成的短板效應(yīng)。

案例

現(xiàn)有一張測(cè)試表,名為student_external,內(nèi)有10.5億條數(shù)據(jù),每條數(shù)據(jù)有一個(gè)唯一的id值?,F(xiàn)從中取出id取值為9億到10.5億的共1.5億條數(shù)據(jù),并通過(guò)一些處理,使得id為9億到9.4億間的所有數(shù)據(jù)對(duì)12取模后余數(shù)為8(即在Shuffle并行度為12時(shí)該數(shù)據(jù)集全部被HashPartition分配到第8個(gè)Task),其它數(shù)據(jù)集對(duì)其id除以100取整,從而使得id大于9.4億的數(shù)據(jù)在Shuffle時(shí)可被均勻分配到所有Task中,而id小于9.4億的數(shù)據(jù)全部分配到同一個(gè)Task中。處理過(guò)程如下

INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )
ELSE CAST(id/100 AS INTEGER)
END,
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

通過(guò)上述處理,一份可能造成后續(xù)數(shù)據(jù)傾斜的測(cè)試數(shù)據(jù)即以準(zhǔn)備好。接下來(lái),使用Spark讀取該測(cè)試數(shù)據(jù),并通過(guò)groupByKey(12)對(duì)id分組處理,且Shuffle并行度為12。代碼如下

public class SparkDataSkew {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder()
.appName("SparkDataSkewTunning")
.config("hive.metastore.uris", "thrift://hadoop1:9083")
.enableHiveSupport()
.getOrCreate();

Dataset<Row> dataframe = sparkSession.sql( "select * from test");
dataframe.toJavaRDD()
.mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))
.groupByKey(12)
.mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {
int id = tuple._1();
AtomicInteger atomicInteger = new AtomicInteger(0);
tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
return new Tuple2<Integer, Integer>(id, atomicInteger.get());
}).count();

sparkSession.stop();
sparkSession.close();
}

}

本次實(shí)驗(yàn)所使用集群節(jié)點(diǎn)數(shù)為4,每個(gè)節(jié)點(diǎn)可被Yarn使用的CPU核數(shù)為16,內(nèi)存為16GB。使用如下方式提交上述應(yīng)用,將啟動(dòng)4個(gè)Executor,每個(gè)Executor可使用核數(shù)為12(該配置并非生產(chǎn)環(huán)境下的最優(yōu)配置,僅用于本文實(shí)驗(yàn)),可用內(nèi)存為12GB。

spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g  --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

GroupBy Stage的Task狀態(tài)如下圖所示,Task 8處理的記錄數(shù)為4500萬(wàn),遠(yuǎn)大于(9倍于)其它11個(gè)Task處理的500萬(wàn)記錄。而Task 8所耗費(fèi)的時(shí)間為38秒,遠(yuǎn)高于其它11個(gè)Task的平均時(shí)間(16秒)。整個(gè)Stage的時(shí)間也為38秒,該時(shí)間主要由最慢的Task 8決定。

在這種情況下,可以通過(guò)調(diào)整Shuffle并行度,使得原來(lái)被分配到同一個(gè)Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的數(shù)據(jù)量,緩解數(shù)據(jù)傾斜。

通過(guò)groupByKey(48)將Shuffle并行度調(diào)整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態(tài)如下圖所示。

從上圖可知,記錄數(shù)最多的Task 20處理的記錄數(shù)約為1125萬(wàn),相比于并行度為12時(shí)Task 8的4500萬(wàn),降低了75%左右,而其耗時(shí)從原來(lái)Task 8的38秒降到了24秒。

在這種場(chǎng)景下,調(diào)整并行度,并不意味著一定要增加并行度,也可能是減小并行度。如果通過(guò)groupByKey(11)將Shuffle并行度調(diào)整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態(tài)如下圖所示。

從上圖可見(jiàn),處理記錄數(shù)最多的Task 6所處理的記錄數(shù)約為1045萬(wàn),耗時(shí)為23秒。處理記錄數(shù)最少的Task 1處理的記錄數(shù)約為545萬(wàn),耗時(shí)12秒。

總結(jié)

適用場(chǎng)景 大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過(guò)大。

解決方案 調(diào)整并行度。一般是增大并行度,但有時(shí)如本例減小并行度也可達(dá)到效果。

優(yōu)勢(shì) 實(shí)現(xiàn)簡(jiǎn)單,可在需要Shuffle的操作算子上直接設(shè)置并行度或者使用spark.default.parallelism設(shè)置。如果是Spark SQL,還可通過(guò)SET spark.sql.shuffle.partitions=[num_tasks]設(shè)置并行度。可用最小的代價(jià)解決問(wèn)題。一般如果出現(xiàn)數(shù)據(jù)傾斜,都可以通過(guò)這種方法先試驗(yàn)幾次,如果問(wèn)題未解決,再嘗試其它方法。

劣勢(shì) 適用場(chǎng)景少,只能將分配到同一Task的不同Key分散開(kāi),但對(duì)于同一Key傾斜嚴(yán)重的情況該方法并不適用。并且該方法一般只能緩解數(shù)據(jù)傾斜,沒(méi)有徹底消除問(wèn)題。從實(shí)踐經(jīng)驗(yàn)來(lái)看,其效果一般。

自定義Partitioner

原理

使用自定義的Partitioner(默認(rèn)為HashPartitioner),將原本被分配到同一個(gè)Task的不同Key分配到不同Task。

案例

以上述數(shù)據(jù)集為例,繼續(xù)將并發(fā)度設(shè)置為12,但是在groupByKey算子上,使用自定義的Partitioner(實(shí)現(xiàn)如下)

    .groupByKey(new Partitioner() {
@Override
public int numPartitions() {
return 12;
}

@Override
public int getPartition(Object key) {
int id = Integer.parseInt(key.toString());
if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
return (id - 9500000) / 12;
} else {
return id % 12;
}
}
})

由下圖可見(jiàn),使用自定義Partition后,耗時(shí)最長(zhǎng)的Task 6處理約1000萬(wàn)條數(shù)據(jù),用時(shí)15秒。并且各Task所處理的數(shù)據(jù)集大小相當(dāng)。

總結(jié)

適用場(chǎng)景 大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過(guò)大。

解決方案 使用自定義的Partitioner實(shí)現(xiàn)類代替默認(rèn)的HashPartitioner,盡量將所有不同的Key均勻分配到不同的Task中。

優(yōu)勢(shì) 不影響原有的并行度設(shè)計(jì)。如果改變并行度,后續(xù)Stage的并行度也會(huì)默認(rèn)改變,可能會(huì)影響后續(xù)Stage。

劣勢(shì) 適用場(chǎng)景有限,只能將不同Key分散開(kāi),對(duì)于同一Key對(duì)應(yīng)數(shù)據(jù)集非常大的場(chǎng)景不適用。效果與調(diào)整并行度類似,只能緩解數(shù)據(jù)傾斜而不能完全消除數(shù)據(jù)傾斜。而且需要根據(jù)數(shù)據(jù)特點(diǎn)自定義專用的Partitioner,不夠靈活。

將Reduce side Join轉(zhuǎn)變?yōu)镸ap side Join

原理 通過(guò)Spark的Broadcast機(jī)制,將Reduce側(cè)Join轉(zhuǎn)化為Map側(cè)Join,避免Shuffle從而完全消除Shuffle帶來(lái)的數(shù)據(jù)傾斜。

案例

通過(guò)如下SQL創(chuàng)建一張具有傾斜Key且總記錄數(shù)為1.5億的大表test。

INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )
ELSE CAST(id/10 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

使用如下SQL創(chuàng)建一張數(shù)據(jù)分布均勻且總記錄數(shù)為50萬(wàn)的小表test_new。

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

直接通過(guò)Spark Thrift Server提交如下SQL將表test與表test_new進(jìn)行Join并將Join結(jié)果存于表test_join中。

INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

該SQL對(duì)應(yīng)的DAG如下圖所示。從該圖可見(jiàn),該執(zhí)行過(guò)程總共分為三個(gè)Stage,前兩個(gè)用于從Hive中讀取數(shù)據(jù),同時(shí)二者進(jìn)行Shuffle,通過(guò)最后一個(gè)Stage進(jìn)行Join并將結(jié)果寫入表test_join中。

從下圖可見(jiàn),Join Stage各Task處理的數(shù)據(jù)傾斜嚴(yán)重,處理數(shù)據(jù)量最大的Task耗時(shí)7.1分鐘,遠(yuǎn)高于其它無(wú)數(shù)據(jù)傾斜的Task約2秒的耗時(shí)。

接下來(lái),嘗試通過(guò)Broadcast實(shí)現(xiàn)Map側(cè)Join。實(shí)現(xiàn)Map側(cè)Join的方法,并非直接通過(guò)CACHE TABLE test_new將小表test_new進(jìn)行cache?,F(xiàn)通過(guò)如下SQL進(jìn)行Join。

CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通過(guò)如下DAG圖可見(jiàn),該操作仍分為三個(gè)Stage,且仍然有Shuffle存在,唯一不同的是,小表的讀取不再直接掃描Hive表,而是掃描內(nèi)存中緩存的表。

并且數(shù)據(jù)傾斜仍然存在。如下圖所示,最慢的Task耗時(shí)為7.1分鐘,遠(yuǎn)高于其它Task的約2秒。

正確的使用Broadcast實(shí)現(xiàn)Map側(cè)Join的方式是,通過(guò)SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設(shè)置得足夠大。

再次通過(guò)如下SQL進(jìn)行Join。

SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通過(guò)如下DAG圖可見(jiàn),該方案只包含一個(gè)Stage。

并且從下圖可見(jiàn),各Task耗時(shí)相當(dāng),無(wú)明顯數(shù)據(jù)傾斜現(xiàn)象。并且總耗時(shí)為1.5分鐘,遠(yuǎn)低于Reduce側(cè)Join的7.3分鐘。

總結(jié)

適用場(chǎng)景 參與Join的一邊數(shù)據(jù)集足夠小,可被加載進(jìn)Driver并通過(guò)Broadcast方法廣播到各個(gè)Executor中。

解決方案 在Java/Scala代碼中將小數(shù)據(jù)集數(shù)據(jù)拉取到Driver,然后通過(guò)Broadcast方案將小數(shù)據(jù)集的數(shù)據(jù)廣播到各Executor。或者在使用SQL前,將Broadcast的閾值調(diào)整得足夠大,從而使用Broadcast生效。進(jìn)而將Reduce側(cè)Join替換為Map側(cè)Join。

優(yōu)勢(shì) 避免了Shuffle,徹底消除了數(shù)據(jù)傾斜產(chǎn)生的條件,可極大提升性能。

劣勢(shì) 要求參與Join的一側(cè)數(shù)據(jù)集足夠小,并且主要適用于Join的場(chǎng)景,不適合聚合的場(chǎng)景,適用條件有限。

為skew的key增加隨機(jī)前/后綴

原理 為數(shù)據(jù)量特別大的Key增加隨機(jī)前/后綴,使得原來(lái)Key相同的數(shù)據(jù)變?yōu)镵ey不相同的數(shù)據(jù),從而使傾斜的數(shù)據(jù)集分散到不同的Task中,徹底解決數(shù)據(jù)傾斜問(wèn)題。Join另一則的數(shù)據(jù)中,與傾斜Key對(duì)應(yīng)的部分?jǐn)?shù)據(jù),與隨機(jī)前綴集作笛卡爾乘積,從而保證無(wú)論數(shù)據(jù)傾斜側(cè)傾斜Key如何加前綴,都能與之正常Join。

案例

通過(guò)如下SQL,將id為9億到9.08億共800萬(wàn)條數(shù)據(jù)的id轉(zhuǎn)為9500048或者9500096,其它數(shù)據(jù)的id除以100取整。從而該數(shù)據(jù)集中,id為9500048和9500096的數(shù)據(jù)各400萬(wàn),其它id對(duì)應(yīng)的數(shù)據(jù)記錄數(shù)均為100條。這些數(shù)據(jù)存于名為test的表中。

對(duì)于另外一張小表test_new,取出50萬(wàn)條數(shù)據(jù),并將id(遞增且唯一)除以100取整,使得所有id都對(duì)應(yīng)100條數(shù)據(jù)。

INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
ELSE CAST(id/100 AS INT) END AS STRING),
name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/100 AS INT) AS STRING),
name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

通過(guò)如下代碼,讀取test表對(duì)應(yīng)的文件夾內(nèi)的數(shù)據(jù)并轉(zhuǎn)換為JavaPairRDD存于leftRDD中,同樣讀取test表對(duì)應(yīng)的數(shù)據(jù)存于rightRDD中。通過(guò)RDD的join算子對(duì)leftRDD與rightRDD進(jìn)行Join,并指定并行度為48。

public class SparkDataSkew{
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");
sparkConf.set("spark.default.parallelism", String.valueOf(parallelism));
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

leftRDD.join(rightRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()))
.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

從下圖可看出,整個(gè)Join耗時(shí)1分54秒,其中Join Stage耗時(shí)1.7分鐘。

通過(guò)分析Join Stage的所有Task可知,在其它Task所處理記錄數(shù)為192.71萬(wàn)的同時(shí)Task 32的處理的記錄數(shù)為992.72萬(wàn),故它耗時(shí)為1.7分鐘,遠(yuǎn)高于其它Task的約10秒。這與上文準(zhǔn)備數(shù)據(jù)集時(shí),將id為9500048為9500096對(duì)應(yīng)的數(shù)據(jù)量設(shè)置非常大,其它id對(duì)應(yīng)的數(shù)據(jù)集非常均勻相符合。

現(xiàn)通過(guò)如下操作,實(shí)現(xiàn)傾斜Key的分散處理

  • 將leftRDD中傾斜的key(即9500048與9500096)對(duì)應(yīng)的數(shù)據(jù)單獨(dú)過(guò)濾出來(lái),且加上1到24的隨機(jī)前綴,并將前綴與原數(shù)據(jù)用逗號(hào)分隔(以方便之后去掉前綴)形成單獨(dú)的leftSkewRDD

  • 將rightRDD中傾斜key對(duì)應(yīng)的數(shù)據(jù)抽取出來(lái),并通過(guò)flatMap操作將該數(shù)據(jù)集中每條數(shù)據(jù)均轉(zhuǎn)換為24條數(shù)據(jù)(每條分別加上1到24的隨機(jī)前綴),形成單獨(dú)的rightSkewRDD

  • 將leftSkewRDD與rightSkewRDD進(jìn)行Join,并將并行度設(shè)置為48,且在Join過(guò)程中將隨機(jī)前綴去掉,得到傾斜數(shù)據(jù)集的Join結(jié)果skewedJoinRDD

  • 將leftRDD中不包含傾斜Key的數(shù)據(jù)抽取出來(lái)作為單獨(dú)的leftUnSkewRDD

  • 對(duì)leftUnSkewRDD與原始的rightRDD進(jìn)行Join,并行度也設(shè)置為48,得到Join結(jié)果unskewedJoinRDD

  • 通過(guò)union算子將skewedJoinRDD與unskewedJoinRDD進(jìn)行合并,從而得到完整的Join結(jié)果集

具體實(shí)現(xiàn)代碼如下

public class SparkDataSkew{
public static void main(String[] args) {
int parallelism = 48;
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

String[] skewedKeyArray = new String[]{"9500048", "9500096"};
Set<String> skewedKeySet = new HashSet<String>();
List<String> addList = new ArrayList<String>();
for(int i = 1; i <=24; i++) {
addList.add(i + "");
}
for(String key : skewedKeyArray) {
skewedKeySet.add(key);
}

Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

JavaPairRDD<String, String> leftSkewRDD = leftRDD
.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));

JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);

JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
.join(rightSkewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));
JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));

skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

從下圖可看出,整個(gè)Join耗時(shí)58秒,其中Join Stage耗時(shí)33秒。

通過(guò)分析Join Stage的所有Task可知

  • 由于Join分傾斜數(shù)據(jù)集Join和非傾斜數(shù)據(jù)集Join,而各Join的并行度均為48,故總的并行度為96

  • 由于提交任務(wù)時(shí),設(shè)置的Executor個(gè)數(shù)為4,每個(gè)Executor的core數(shù)為12,故可用Core數(shù)為48,所以前48個(gè)Task同時(shí)啟動(dòng)(其Launch時(shí)間相同),后48個(gè)Task的啟動(dòng)時(shí)間各不相同(等待前面的Task結(jié)束才開(kāi)始)

  • 由于傾斜Key被加上隨機(jī)前綴,原本相同的Key變?yōu)椴煌腒ey,被分散到不同的Task處理,故在所有Task中,未發(fā)現(xiàn)所處理數(shù)據(jù)集明顯高于其它Task的情況

實(shí)際上,由于傾斜Key與非傾斜Key的操作完全獨(dú)立,可并行進(jìn)行。而本實(shí)驗(yàn)受限于可用總核數(shù)為48,可同時(shí)運(yùn)行的總Task數(shù)為48,故而該方案只是將總耗時(shí)減少一半(效率提升一倍)。如果資源充足,可并發(fā)執(zhí)行Task數(shù)增多,該方案的優(yōu)勢(shì)將更為明顯。在實(shí)際項(xiàng)目中,該方案往往可提升數(shù)倍至10倍的效率。

總結(jié)

適用場(chǎng)景 兩張表都比較大,無(wú)法使用Map則Join。其中一個(gè)RDD有少數(shù)幾個(gè)Key的數(shù)據(jù)量過(guò)大,另外一個(gè)RDD的Key分布較為均勻。

解決方案 將有數(shù)據(jù)傾斜的RDD中傾斜Key對(duì)應(yīng)的數(shù)據(jù)集單獨(dú)抽取出來(lái)加上隨機(jī)前綴,另外一個(gè)RDD每條數(shù)據(jù)分別與隨機(jī)前綴結(jié)合形成新的RDD(相當(dāng)于將其數(shù)據(jù)增到到原來(lái)的N倍,N即為隨機(jī)前綴的總個(gè)數(shù)),然后將二者Join并去掉前綴。然后將不包含傾斜Key的剩余數(shù)據(jù)進(jìn)行Join。最后將兩次Join的結(jié)果集通過(guò)union合并,即可得到全部Join結(jié)果。

優(yōu)勢(shì) 相對(duì)于Map則Join,更能適應(yīng)大數(shù)據(jù)集的Join。如果資源充足,傾斜部分?jǐn)?shù)據(jù)集與非傾斜部分?jǐn)?shù)據(jù)集可并行進(jìn)行,效率提升明顯。且只針對(duì)傾斜部分的數(shù)據(jù)做數(shù)據(jù)擴(kuò)展,增加的資源消耗有限。

劣勢(shì) 如果傾斜Key非常多,則另一側(cè)數(shù)據(jù)膨脹非常大,此方案不適用。而且此時(shí)對(duì)傾斜Key與非傾斜Key分開(kāi)處理,需要掃描數(shù)據(jù)集兩遍,增加了開(kāi)銷。

大表隨機(jī)添加N種隨機(jī)前綴,小表擴(kuò)大N倍

原理

如果出現(xiàn)數(shù)據(jù)傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來(lái),意義不大。此時(shí)更適合直接對(duì)存在數(shù)據(jù)傾斜的數(shù)據(jù)集全部加上隨機(jī)前綴,然后對(duì)另外一個(gè)不存在嚴(yán)重?cái)?shù)據(jù)傾斜的數(shù)據(jù)集整體與隨機(jī)前綴集作笛卡爾乘積(即將數(shù)據(jù)量擴(kuò)大N倍)。

案例

這里給出示例代碼,讀者可參考上文中分拆出少數(shù)傾斜Key添加隨機(jī)前綴的方法,自行測(cè)試。

public class SparkDataSkew {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("ResolveDataSkewWithNAndRandom");
sparkConf.set("spark.default.parallelism", parallelism + "");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
.mapToPair((String row) -> {
String[] str = row.split(",");
return new Tuple2<String, String>(str[0], str[1]);
});

List<String> addList = new ArrayList<String>();
for(int i = 1; i <=48; i++) {
addList.add(i + "");
}

Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));

JavaPairRDD<String, String> rightNewRDD = rightRDD
.flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
.map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
.collect(Collectors.toList())
.iterator()
);

JavaPairRDD<String, String> joinRDD = leftRandomRDD
.join(rightNewRDD, parallelism)
.mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
AtomicInteger atomicInteger = new AtomicInteger();
iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
});

javaSparkContext.stop();
javaSparkContext.close();
}
}

總結(jié)

適用場(chǎng)景 一個(gè)數(shù)據(jù)集存在的傾斜Key比較多,另外一個(gè)數(shù)據(jù)集數(shù)據(jù)分布比較均勻。

優(yōu)勢(shì) 對(duì)大部分場(chǎng)景都適用,效果不錯(cuò)。

劣勢(shì) 需要將一個(gè)數(shù)據(jù)集整體擴(kuò)大N倍,會(huì)增加資源消耗。

總結(jié)

對(duì)于數(shù)據(jù)傾斜,并無(wú)一個(gè)統(tǒng)一的一勞永逸的方法。更多的時(shí)候,是結(jié)合數(shù)據(jù)特點(diǎn)(數(shù)據(jù)集大小,傾斜Key的多少等)綜合使用上文所述的多種方法。

瀏覽 48
點(diǎn)贊
評(píng)論
收藏
分享

手機(jī)掃一掃分享

分享
舉報(bào)
評(píng)論
圖片
表情
推薦
點(diǎn)贊
評(píng)論
收藏
分享

手機(jī)掃一掃分享

分享
舉報(bào)

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 人妻天天爽| 中字无码| a在线免费观看| 欧洲成人在线观看| 亚洲男人的天堂视频网在线观看+720P | 日韩视频一区| 中文字幕在线观看亚洲| 国产精品午夜福利| 成人欧美精品区二区三| 中文字幕在线观看二区| 国产女人在线视频| 黄色小说在线看| 国产91黄色| 蜜桃免费AV| 欧美日韩中文字幕无码| 99视频内射三四| 日老女人逼| 免费手机av| 欧美日韩中文在线| 国产精品你懂得| 日韩av免费在线| 香蕉视频a| 91久久久裸身美女| 国产色五月视频| 日本在线免费观看| 成人在线视频网| 在线播放JUY-925被丈夫上司侵犯的第7天| 人人操综合| 影音先锋成人无码| 日韩三级网| 大香蕉草久| 自慰在线观看网站| 91大熟女91大腚女人| 精品国产免费观看久久久_久久天天 | 久久六月天| 美女性爱3P视频| 国产日逼视频| 免费中文字幕日韩欧美| 无码精品ThePorn| 国产成人无码一区二区在线| 好吊看视频| 99久在线精品99re8热| 操逼视频免费网站| 国产精品久久久999| 各种BBwBBwBBwBBw| 日本黄色视频在线免费观看| 成av人片一区二区三区久久| 少妇搡BBBB搡BBB搡小说 | www.黄色在线| 久久久久久久久久久久久久久久久久免费精品分类视频 | 一本一道久久综合狠狠躁牛牛影视| 欧美老女人操逼| 蜜臀999| 中文字幕日韩在线视频| 亚洲无码99| 91人人妻人人澡人人爽人人| 午夜99| 国产福利免费| 亚洲黄色影视| 黄色视频免费播放| 免费看黄色一级片| 亚洲国产精品午夜福利| 欧美性BBB槡BBB槡BBB| 黄色A片电影| 亚洲AV成人无码AV小说| 亚洲V无码| 日韩第1页| 亚洲观看黄色网| 桃花岛tⅴ+亚洲品质| 北条麻妃AV观看| 中文字幕++中文字幕明步| 99在线观看免费视频| 中国九九盗摄偷拍偷看| www.蜜桃视频| 亚洲欧美v在线视频| 六十路老熟女码视频| www.jiujiujiu| 国产精品做爱| 成人A片免费在线观看| 手机看片1024你懂的| 日韩天天操| 人人操人人插| 天天爽天天干| 99国产在线观看| 免费看一区二区三区A片| 日韩激情网| 婷婷五月中文字幕| 男人先锋| 亚洲中文字幕第一页| 久久精品福利| 亚洲九九| 一级黄色视频日逼片| 国产做受精品网站在线观看| 天堂在线视频免费| 日韩七区| 视频國产在线| 国产精品成人免费| 日本免费版网站nba| 天天操天天日天天操| 99视频在线精品| 婷婷五月天成人| 国产精品99视频| 熟女AV888| V在线| 久久婷婷六月综合| 四川少妇BBBB槡BBBB槡| 91网站在线看| 成人在线视频免费| 精品人妻少妇| 香蕉操逼小视频| 国产欧美日韩成人| 成人亚洲天堂| 色婷婷AV在线| 一级AV在线| 午夜福利澳| 人人干国产| 国产一片黑夜内射| 中文字幕一区在线| a在线观看视频| 九九这里有精品| 日韩成人中文字幕| 亚洲一级黄色片| 青青草原在线| 一道本高清无码| av无码观看| 91人妻人人操人人爽| 在线观看老湿视频福利| 精品99999| 蜜桃视频一区二区三区四区使用方法| 一道本av| 亚洲理论片| 中文字幕一区二区三区四区| 一区二区成人电影| 超碰97在线精品国产| 大香蕉免费| 特黄色视频| 久久综合伊人7777777| 久草电影网站| 五月激情六月婷婷| 国产操逼的视频| 少妇高潮喷水| 美女黄片| 午夜精品视频| 18禁网站网址| 日韩黄色免费网站| 老太老熟女城中层露脸60| 午夜AV在线免费观看| 91免费高清视频| 欧美日韩成人在线视频| 青青青在线视频| 97人妻精品黄网站| 大香蕉玖玖| 青青草娱乐视频| 免费无码在线播放| 内射一区| 91麻豆成人| 黄色视频在线观看亚洲一区二区三区免费 | 成人三区| 91丨九色丨蝌蚪丨肥女| 欧美怡红院视频| 亚洲专区区免费| 日韩精品在线免费| 嫩小槡BBBB槡BBBB槡漫画| 欧美性爱中文字幕| 亚洲精品一区无码A片丁香花| 久色婷婷在线| 国产超碰青青草| 男人天堂大香蕉| 男女做爱视频网站| 天天拍夜夜操| 开心色色五月天| 无码人妻av一区| 夜夜骚av.一区二区三区四区| 天天摸天天日| 荫蒂添出高潮A片视频| 国产亲子乱A片免费视频| 国产又大又粗又爽| 一卡二卡三卡无码| 日韩欧美成人在线视频| 正在播放JUQ-878木下凛凛子| 嫩BBB槡BBBB槡BBB| 婷婷久草网| 天天射天天日天天干| 天天搞天天搞| 中文字幕精品亚洲熟女| 国产高清视频在线| 777性爱| 欧美狠狠插| 熟女天堂| 青草精品视频| 99久久免费网| 色XXX| 大炕上公让我高潮了六次| 婷婷五月综合在线| 91豆花视频18| 亚洲无码免费视频在线观看| 尤物视频在线观看视频| 中文无码一区二区三区| 日批视频网站| 蜜桃Av| 成人理论片| 97人妻精品一区二区三区图片 | 欧美激情一级| 中文无码播放| 日日精品| 久草视频免费在线播放| 91中文字幕| 国产三级成人| 国产精品美女久久久久AV爽 | 91一区在线观看| 九九九九九九精品| 琪琪色视频| 黄色视频| 色综合社区| 免费无码婬片AAAA片老婦| 国产三级91| 国精产品一区一区三区四区| 996热re视频精品视频| 蜜臀AV在线播放| AV在线直播| 久草新| 福利导航视频| 久久久97| 人妻电影亚洲av| 另类aV| 日韩高清无码三级片| 欧美9999| 天天噜天天操| 特级西西人体444www高清大胆 | 色片免费| 罗莉AV| 亚洲色情视频| 日韩Va| 91精品久久久久久久久久| 91在线小视频| 操人| 日韩网站在线观看| 国产一卡二卡三卡| 俺来俺去www色婷婷| 一级AA视频| 欧美日韩中文字幕| 色色网站免费| 国产激情在线观看视频| 亚洲欧美性爱视频| 欧美footjob| 超碰在线人妻| 在线观看无码高清| 懂色av懂色av粉嫩av分享吧 | 国产精品无码天天爽视频| 国精产品秘一区二区-| 大香蕉网站视频| 国产9熟妇视频网站| 毛片二区| 国产成人三级片在线观看| 一本色综合亚洲精品| 大香蕉操逼视频| 操B视频在线免费观看| 可以免费看的AV| 色九九九| 亚洲素人无码| 日韩在线观看视频网站| 亚洲三级网站| 午夜久| 97自拍| 免费一级无码婬片A片AAA毛片| 欧美aaa在线| 亚洲欧美视频在线| 就爱操逼网| 香蕉视频毛片| www.天天日| 97人妻一区二区精品免费视频| 韩国三级HD久久精品HD| 人人色人人| 亚洲黄色在线免费观看| 91露脸熟女四川熟女在线观看 | 色片免费| 亚洲有码在线播放| av在线影院| 99re超碰| 日本不卡一区二区| 综合色五月| 久操人妻| 欧美A视频在线观看| 久久久久久久久久国产精品| 黄色免费在线观看视频| 艹逼视频免费观看| 影音先锋AV在线资源| 青春草视频| 五月天婷婷无码| 男人天堂无码| 中文字幕一二三| 在线观看的av网站| 黄色激情网站| 在线亚洲欧洲| 国产对白在线| 亚洲丁香五月天| 超碰天天爱| 北条麻妃在线播放一区| 亚洲综合中文字幕在线| 秋霞午夜成人无码精品| a√天堂资源中文8| 精品自拍视频| 啪啪成人视频| 亚洲国产三级片| 91白浆肆意四溢456| 无码草逼| 无码视频在线| 91人妻人人澡人人爽人人玩| 一级免费黄色电影| 欧美视频久久| 亚洲欧美在线观看视频| 欧美国产日韩视频| 粉嫩小泬BBBBBB免费| 国产三级日本三级国产三级| 91无码高清| 欧美怡春院| 久久AV网站| 丝袜无码| 免费日比视频| 成人网站在线免费看| 一区二区三区四区无码在线| 老司机免费福利视频| 国产AV无码一区| 久久久久久穴| 亚洲天堂av在线免费观看| 欧美日韩一级电影| 在线观看AV91| 日韩视频网址| 亚洲色播放| 婷婷久久综合久| 高清无码在线免费观看| 天天插天天狠| 国产熟妇搡BBBB搡BBBB毛片| 国产高清a| 天天爽天天操| 亚洲一| 国产有码| 日本黄色A片免费看| 強姦婬片A片AAA毛片Mⅴ| 国产成人h| 蜜桃av秘无码一区二区| 日韩人妻系列| 99色视频| 91丝袜在线| 国产美女高潮| 国产熟睡乱子伦午夜视频_第1集| 久久免费观看视频| 色欲精品| 日韩超碰| 久久香蕉网站| 在线观看高清无码| 色婷婷小说| 国产黄色大片| 人人干97| A一级黄色| 黄色小说视频网站| 国产一级免费在线观看| 一级特黄AA片| 国产精品一级A片| 91人妻在线视频| 中文字幕日韩乱伦| 另类罕见稀奇videos| 人妻综合第一页| 欧美三级片网址| 99久久丫e6| 69超碰| 免费无码蜜臀在线观看| 久久6| 国产网址| 亚洲第一香蕉视频| 91无码在线视频| 性欧美丰满熟妇XXXX性久久久| 日韩精品一区在线观看| 人人摸人人操人人看| 最近最好的2019中文| 国产A级毛片久久久久久| 亚洲无码第一页| 国产草莓视频| 无码一区精品久久久成人| 色老板在线免费观看| 亚洲av性爱| 爱搞搞网| 欧美日韩一级黄片| 天天色天天干天天日| 操逼逼片| 免费操逼| 免费看一级黄色片| 国产无码乱伦内射| 男人天堂V| 黄片网站在线免费观看| 人妻少妇精品| 丰满人妻一区二区三区免费| 黄色亚洲无码| 一曲二曲三曲在线观看中文字| 日韩免费看片| 日本A片在线观看| 国产毛片一区| 色色色成人视频| 成人色综合| 蜜桃精品无码| 大香蕉A片| 东京热无码一区| 成人精品A片免费网站| 午夜视频在线播放| 九九99精品视频| 亚洲中文字幕第一页| 少妇推油呻吟白浆啪啪成人片| 高清操逼| 亚洲免费视频播放| 亚洲AV无码成人片在线| 欧洲精品码一区二区三区免费看| 国产精品久久久久精| 色综合五月婷婷| 麻豆91精品人妻成人无码| 特级西西西88大胆无码| 淫乱人妻| 无码AV一区| 精品久久无码| AV毛片| 亚洲AV无码精品成人| 午夜无码久久| 在线观看黄色视频网站| 免费观看黄色视频网站| 婷婷五月免费视频| 国产成人网| 亚洲激情无码视频| 444444免费高清在线观看电视剧的注意| 豆花视频logo进入官网| 亚洲精品无码免费| 性感欧美美女| 亚洲久久久久久| 天堂国产一区二区三区| 久久久性爱| 国产凹凸视频| 中文免费高清在线| 大香蕉婷婷| 中文字幕日本在线| 内射一区二区三区| 国产精品一区二区三区四区| 国产美女全裸网站| 人妻丰满精品一区二区| 91久久久久久久久18| 亚洲无码高清视频在线| 欧产日产国产swag| 一区二区三区四区在线| 欧美日韩视频在线播放| 18禁看网站| 亚洲色图狠狠撸| 日韩麻豆| 成人免费乱码大片a毛片蜜芽| 北条麻妃无码精品AV怎么看| 国产黄色视频在线免费观看| 逼逼爱| 亚洲福利久久| 陈冠希和张柏芝mv| 大香蕉免费在线观看| 日本精品在线观看视频| 中文字幕在线一区| 北条麻妃av在线播放| 国产精品v欧美精品v日韩精品| 在线看片a| 国产主播在线播放| 亚洲午夜无码久久久| 97色色得| 亚洲久久久久| 麻豆影音先锋| 国产精品做爱| av大香蕉| 一区二区三区成人电影| 国产免费观看视频| 亚洲精品A片| 免费看操逼视频| 91丨熟女丨对白| 国产av小电影| 日本成人中文字幕在线观看| 婷婷伊人| 成人黄色性爱视频| 欧美在线操| 午夜撸一撸| 亚洲三级片视频| 日B视频在线观看| 日本黄色视频。| 亚洲中文字幕在线观看| 久久穴| 国产欧美日韩在线视频| 五月丁香色色网| 亚洲无码视频在线播放| av香蕉| 成人性生活影视av| 久久草在线| 三级片免费网址| 香蕉成人网站在线观看| 亚洲在线中文字幕| 欧美亚洲中文| 黄色毛片在线| 黄色亚洲| 91导航| 亚洲在线成人| 欧美三级在线观看视频| 欧美狠狠干| 自拍偷拍欧美| 91AV视频在线| 日韩做爱网站| 国产18女人水真多免费看| 猛操美女| 欧美一级大香蕉| 丁香五月激情在线| 91AV一区二区三区| 全部视频午夜寂寞| 天天干天天撸| 国产精品乱子伦视频一区二区| 国产成人自拍偷拍视频| 亚洲色成人网站www永久四虎| 天天色播| 婷婷免费视频| 黄片小视频在线观看| 操亚洲| 操操操AV| 翔田千里无码| 日韩黄色一级视频| 色鬼综合| 一区二区三区四区五区| 开心五月色婷婷综合开心网| 无码中文字幕高清| 久久久性爱视频| 无码成人av| 人人草在线观看| 另类BBwBBw| 欧美在线va| 91在线观看18| 欧美手机在线视频| 日韩黄色在线| 日本免费无码| 欧美日韩高清丝袜| 国产91探花| 久久电影五月天| www.大鸡巴| 日韩精品成人无码免费| 超碰人人网| 阿v视频在线观看| 蜜乳AV一区二区三区| 成人片天天看片欧美一级| 成人午夜免费视频| 91在线无码精品秘蜜桃入口| 免费做a爰片77777| 丰滿人妻-区二区三区| 一级特黄大片录像i| 人妻无码中文久久久久专区| 欧美一级在线| 波多野结衣在线无码| 国产黄色免费视频| 亚洲AV成人无码精品| 国产又爽又黄视频| 国产婷婷五月天| 极品美女援交在线| 欧美日屄| 国产av黄色| 色婷婷日韩精品一区二区三区| 国产精品国产三级国产专区53 | 91激情电影| 亚洲成人国产| 男人天堂V| 婷婷五月丁香花| 一级a一级a爰片免费| 亚洲一级黄色大片| 99国产高清| 中文字幕一二三| 日韩精品一区二区三区四区| 久久精品在线观看| 人人操人人插| 色午夜| 国内成人AV| 中国老女人性爱视频| 中文字幕乱码无码人妻系列蜜桃 | 99在线观看视频在线高清| 久久人妻无码| 天堂VA蜜桃一区二区三区| 五月天堂网| 制服丝袜强奸乱伦| 91www| 四色永久成人网站| 日本有码在线| 国产乱子伦无码视频免费| 亚洲福利在线免费观看| 蜜臀成人片| 99久久婷婷国产综合精品青牛牛| 国产精品免费久久影院| 欧美A级视频| 91香蕉视频在线播放| 成人爽a毛片一区二区免费| 婷婷五月天综合| 久久久久久久97| 爱操影院| 91在线永久| 久久午夜无码人妻精品蜜桃冫| 91官网在线观看| 国产精品99久久免费黑人人妻| 伊人丁香| 色逼高清| 中文字幕一级A片免费看| 99re在线观看| 婷婷国产亚洲精品网站| 人人干人人艹| 亚洲午夜无码| 91香蕉麻豆| 午夜福利10000| 亚洲精品资源在线| www.97超碰| 日本中文视频| 伊香蕉大综综综合| 日韩三级在线播放| 九色91PORNY国产| 草久在线观看| 大香蕉大香蕉免费网| 黄片视频在线免费播放| 日韩不卡高清在线观看视频| 日韩一a| 国产视频一区二区在线| 成人免费Av| 青娱乐极品久久| 免费视频一区| 国产黄色录像| 水蜜桃在线视频| 婷婷丁香六月| 中文字幕有码在线| 一级A片在线观看| 亚洲无码福利| 国产精品操逼视频| 国产无码高潮在线| 亚洲sese| 国产精品秘麻豆果冻传媒潘甜甜丶| 一级理论片| 黄频在线观看| www.超碰在线| 97人人爽| 亚洲性爱中文字幕| 免费亲子乱婬一级A片| 欧美成人h| 亚洲视频免费看| 九九国产精品| 性爱视频网站| 图片区视频区小说区| 人妻av在线| 黄色AA片| 一级电影网站| 国产秘久久一区二区| 99热99| 亚洲永久| GOGO人体做爰大胆视频| 伊人网大香蕉| 97人人干| 久久婷婷五月天| 91成人情欲影视网| 国产激情欧洲在线观看一区二区三区 | 91一区在线观看| 日本少妇中文字幕| 日韩AV无码高清| 久草网在线观看| 四虎成人精品永久免费AV九九| 久久久永久免费视频| 婷婷无码成人精品俺来俺去| www.xxx| 少妇人妻av| 无码人妻一区二区三区在线视频不卡| 俺去俺来也在线www色情网| 在线观看视频国产| 国产一級A片免费看| 免费18禁网站| 天堂中文资源库| 一区二区水蜜桃| 国产精品18禁| 婷婷五月情| 欧美成人自拍| 成人黄色电影在线观看| 影音先锋成人AV| 中文字幕第一页在线| 国产免费视频69| 色伊人久操视频| 日本欧洲三级| 操逼资源| 天堂中文资源在线观看| 最近中文字幕在线观看| 成人性爱视频网| 欧美三级一级| 亚州操逼片| 天天视频黄色| 日韩成人高清无码| 五月天无码| 91久久欧美极品XXXXⅩ| 中文久久| 日韩视频一二三| 亚洲精品鲁一鲁一区二区三区| 国内精品人妻无码久久久影院蜜桃| 黄色小视频在线| 影音先锋色AV| 97操| 欧美另类| 日韩一级在线播放| 深爱五月婷婷| 制服.丝袜.亚洲.中文豆花| 青娱乐极品久久| 国产人与禽zoz0性伦| 亚洲无码成人AV| 亚洲最新AV网站| www.豆花福利视频| 插插插菊花综合网| 激情丁香六月| 岛国AV免费看| 天天干天天草| 亚洲无码影视| 色淫视频| 三级毛片网站| 国产无码影视| 一本大道东京热AV| 欧美丰满美乳XXⅩ高潮www | 国产中文自拍| 日本免费无码| 亚洲三级黄色| 成年人黄色片| 中文字幕av在线观看| 草久美女| 干欧美女人| 99久免费视频| 国产成人无码Av片在线公司| 欧美久久视频| 毛多水多丰满女人A片| 久久99深爱久久99精品| 亚洲精品成人无码| 蜜桃精品久久久| 中文在线不卡| 日韩免费精品视频| 麻豆精品一区| 久久久久亚洲AV成人网人人软件| 在线观看91| 中文字幕片av| 在线91视频| 国产一区久久| 亚洲秘av无码一区二区| 国产成人a亚洲精品无码| 欧美视频二区| 国产婷婷| 日韩性爱视频在线播放| 日本中文字幕精品| 成人福利视频在线观看| 免费黄色视频网址| 成人免费网站| 青青操在线视频| 国产欧美在线观看不卡| 影音先锋三区| 蜜臀久久久99久久久久久久| 色先锋av| 免费日逼视频| 狼友视频一国产| 成人香蕉网| 久久AV电影| 影音先锋无码一区| 黄色AV电影| 久久婷婷婬片A片AAA| 国产不卡视频| 无码啪啪啪| 网址你懂的| 欧美在线观看一区二区| 午夜在线无码| 亚洲无吗在线观看| 97人妻精品一区二区三区免| 婷婷精品免费久久| 香蕉视频在线看| 一级AA片| 安徽妇搡BBBB搡BBB| 日本精品视频一区二区| 欧美毛片在线观看| 欧美国产日韩在线| 亚洲午夜福利| 最新av网| 美女性爱3P视频| 97精品人人A片免费看| 国产女人在线视频| 18AV在线观看| 激情AV在线| 三级乱伦86丝袜无码| 日本黄色A片| 日韩一级一级一级| 热久精品| 亚洲中文久久| 操大香蕉| 四川少妇BBB凸凸凸BBB安慰我| 欧一美一婬一伦一区二区三区自慰国| 操逼爽| 免费久久久| 无码一级片| 性欧美XXXX| 永久m3u8在线观看| 婷婷五月天小说| 一本大道东京热av无码| A片在线免费播放| 2019天天操| 精品成人免费视频| 成人网站免费在线| 影音先锋成人视频| 国产69精品久久| 最近中文字幕在线| 伊人色五月| 中文字幕亚洲区| 大香蕉久久久| 欧美激情中文字幕| 日韩无码少妇| 中文字幕第2页| 一区二区三区欧美| 五月婷婷中文版| 午夜福利100理论片| 国产一级A片久久久免费看快餐| 一区二区三区在线观看| 狠狠操网站| 2019中文字幕在线| 亚洲成人a| 婷婷社区五月天| 久久久亚洲无码| 久久午夜成人电影| 噜噜噜在线视频| 韩日一区| 色香蕉视频在线观看| 年轻女教师高潮2| 蜜桃视频在线入口www| 大香蕉综合在线| 亚洲中文字幕日本| 色老板最新地址| 强辱丰满人妻HD中文字幕| 久久永久免费精品人妻专区| 国精品无码人妻一区二区三区| 成年女人免费视频| 狠狠网| 西西444WWW无码大胆在线观看| 少妇搡BBBB搡BBB搡造水多/ | 最近中文字幕高清2019中文字幕| 99天天视频| 韩国三级HD久久精品HD| 精品人妻一区二区| 国产女人免费| 青青草娱乐视频| 日本親子亂子倫XXXX50路| 国产又粗又大又爽91嫩草| 家庭乱伦影视| 亚洲中文字幕在线视频观看| 亚洲性片| 超碰成人在线观看| 久久久www成人免费毛片| 亚洲超碰在线观看| 自慰精品| 欧美视频手机在线| 影音先锋久久久| 国产精品99久久久久的广告情况| 精品看片| 日韩十八禁网站| 婷婷五月丁香网| 日韩人妻精品一区二区| 手机av在线观看| 日韩第一页| 人人妻人人玩澡人人爽| 操逼操逼视频| 无码A片| 久久高清亚洲| 久久黄色大片| yw在线观看| 老司机午夜免费精品视频| 亚洲AV无一区二区三区久久| 亚洲色图在线观看| 亚洲乱伦av| 天天艹天天| www一级片| 一区二区三区四区在线| 在线免费观看成人网站| 欧美精品18| 日本东京热视频| 久久久久久大香蕉| 亚洲在线视频免费观看| 大奶一区二区| 另类老妇性BBwBBw图片| 波多野结衣在线无码| 国内自拍2025| 成人蜜臀AV| 亚洲第一视频在线观看| 51成人网站免费| 国精品无码一区二区三区在线秋菊| 91秦先生在线播放| 黄色一级视频| 美日韩AV| 人人操av| 午夜成人国产| 国产办公室丝袜人妖| 青娱乐AV| 国产91久久婷婷一区二区| 国产精品色色| 五月婷婷性爱| AV网站免费在线观看| 国产精品视频播放| 99天堂网| 免费看无码| www.17c嫩嫩草色蜜桃网站| 北条麻妃无码一区三区| 久久肏屄|