国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

【硬剛大數(shù)據(jù)】從零到大數(shù)據(jù)專家面試篇之SparkSQL篇

共 39321字,需瀏覽 79分鐘

 ·

2021-08-20 12:07

本文是對《【硬剛大數(shù)據(jù)之學習路線篇】2021年從零到大數(shù)據(jù)專家的學習指南(全面升級版)》的面試部分補充。

部分文章:

1.談談你對Spark SQL的理解

Spark SQL是一個用來處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,前身是shark,但是shark過多的依賴于hive如采用hive的語法解析器、查詢優(yōu)化器等,制約了Spark各個組件之間的相互集成,因此Spark SQL應運而生。
Spark SQL在汲取了shark諸多優(yōu)勢如內(nèi)存列存儲、兼容hive等基礎上,做了重新的構(gòu)造,因此也擺脫了對hive的依賴,但同時兼容hive。除了采取內(nèi)存列存儲優(yōu)化性能,還引入了字節(jié)碼生成技術、CBO和RBO對查詢等進行動態(tài)評估獲取最優(yōu)邏輯計劃、物理計劃執(zhí)行等。基于這些優(yōu)化,使得Spark SQL相對于原有的SQL on Hadoop技術在性能方面得到有效提升。
同時,Spark SQL支持多種數(shù)據(jù)源,如JDBC、HDFS、HBase。它的內(nèi)部組件,如SQL的語法解析器、分析器等支持重定義進行擴展,能更好的滿足不同的業(yè)務場景。與Spark Core無縫集成,提供了DataSet/DataFrame的可編程抽象數(shù)據(jù)模型,并且可被視為一個分布式的SQL查詢引擎。

2.談談你對DataSet/DataFrame的理解

DataSet/DataFrame都是Spark SQL提供的分布式數(shù)據(jù)集,相對于RDD而言,除了記錄數(shù)據(jù)以外,還記錄表的schema信息。
DataSet是自Spark1.6開始提供的一個分布式數(shù)據(jù)集,具有RDD的特性比如強類型、可以使用強大的lambda表達式,并且使用Spark SQL的優(yōu)化執(zhí)行引擎。DataSet API支持Scala和Java語言,不支持Python。但是鑒于Python的動態(tài)特性,它仍然能夠受益于DataSet API(如,你可以通過一個列名從Row里獲取這個字段 row.columnName),類似的還有R語言。
DataFrame是DataSet以命名列方式組織的分布式數(shù)據(jù)集,類似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame變成類型為Row的Dataset:type DataFrame = Dataset[Row]。
DataFrame在編譯期不進行數(shù)據(jù)中字段的類型檢查,在運行期進行檢查。但DataSet則與之相反,因為它是強類型的。此外,二者都是使用catalyst進行sql的解析和優(yōu)化。為了方便,以下統(tǒng)一使用DataSet統(tǒng)稱。
DataSet創(chuàng)建
DataSet通常通過加載外部數(shù)據(jù)或通過RDD轉(zhuǎn)化創(chuàng)建。
  • 1.加載外部數(shù)據(jù) 以加載json和mysql為例:

val ds = sparkSession.read.json("/路徑/people.json")

val ds = sparkSession.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://ip:port/db",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()
  • 2.RDD轉(zhuǎn)換為DataSet 通過RDD轉(zhuǎn)化創(chuàng)建DataSet,關鍵在于為RDD指定schema,通常有兩種方式(偽代碼):

1.定義一個case class,利用反射機制來推斷

1) 從HDFS中加載文件為普通RDD
val lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))

2) 定義case class(相當于表的schema)
case class Person(id:Int, name:String, age:Int)

3) 將RDD和case class關聯(lián)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

4) 將RDD轉(zhuǎn)換成DataFrame
val ds= personRDD.toDF

2.手動定義一個schema StructType,直接指定在RDD上

val schemaString ="name age"

val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))

val ds = sparkSession.createDataFrame(rowRdd,schema)
操作DataSet的兩種風格語法
DSL語法
1.查詢DataSet部分列中的內(nèi)容
personDS.select(col("name"))
personDS.select(col("name"), col("age"))
2.查詢所有的name和age和salary,并將salary加1000
personDS.select(col("name"), col("age"), col("salary") + 1000)
personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)
3.過濾age大于18的
personDS.filter(col("age") > 18)
4.按年齡進行分組并統(tǒng)計相同年齡的人數(shù)
personDS.groupBy("age").count()
注意:直接使用col方法需要import org.apache.spark.sql.functions._
SQL語法
如果想使用SQL風格的語法,需要將DataSet注冊成表
personDS.registerTempTable("person")
//查詢年齡最大的前兩名
val result = sparkSession.sql("select * from person order by age desc limit 2")
//保存結(jié)果為json文件。注意:如果不指定存儲格式,則默認存儲為parquet
result.write.format("json").save("hdfs://ip:port/res2")

3.說說Spark SQL的幾種使用方式

  • 1.sparksql-shell交互式查詢

就是利用Spark提供的shell命令行執(zhí)行SQL
  • 2.編程

首先要獲取Spark SQL編程"入口":SparkSession(當然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive則為HiveContext)。這里以讀取parquet為例:
val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();
val df = sparkSession.read.format("parquet").load("/路徑/parquet文件")
然后就可以針對df進行業(yè)務處理了。
  • 3.Thriftserver

beeline客戶端連接操作 啟動spark-sql的thrift服務,sbin/start-thriftserver.sh,啟動腳本中配置好Spark集群服務資源、地址等信息。然后通過beeline連接thrift服務進行數(shù)據(jù)處理。hive-jdbc驅(qū)動包來訪問spark-sql的thrift服務 在項目pom文件中引入相關驅(qū)動包,跟訪問mysql等jdbc數(shù)據(jù)源類似。示例:
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");
try {
val stat = conn.createStatement()
val res = stat.executeQuery("select * from people limit 1")
while (res.next()) {
println(res.getString("name"))
}
} catch {
case e: Exception => e.printStackTrace()
} finally{
if(conn!=null) conn.close()
}

4.說說Spark SQL 獲取Hive數(shù)據(jù)的方式

Spark SQL讀取hive數(shù)據(jù)的關鍵在于將hive的元數(shù)據(jù)作為服務暴露給Spark。除了通過上面thriftserver jdbc連接hive的方式,也可以通過下面這種方式:
首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下內(nèi)容:
<property>
<name>hive.metastore.uris</name>
<value>thrift://ip:port</value>
</property>
然后,啟動hive metastore
最后,將hive-site.xml復制或者軟鏈到$SPARK_HOME/conf/。如果hive的元數(shù)據(jù)存儲在mysql中,那么需要將mysql的連接驅(qū)動jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,啟動spark-sql即可操作hive中的庫和表。而此時使用hive元數(shù)據(jù)獲取SparkSession的方式為:
val spark = SparkSession.builder()
.config(sparkConf).enableHiveSupport().getOrCreate()

5.分別說明UDF、UDAF、Aggregator

  • UDF UDF是最基礎的用戶自定義函數(shù),以自定義一個求字符串長度的udf為例:

    val udf_str_length = udf{(str:String) => str.length}
    spark.udf.register("str_length",udf_str_length)
    val ds =sparkSession.read.json("路徑/people.json")
    ds.createOrReplaceTempView("people")
    sparkSession.sql("select str_length(address) from people")
  • UDAF 定義UDAF,需要繼承抽象類UserDefinedAggregateFunction,它是弱類型的,下面的aggregator是強類型的。以求平均數(shù)為例:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
  • Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

6.對比一下Spark SQL與HiveSQL

7.說說Spark SQL解析查詢parquet格式Hive表如何獲取分區(qū)字段和查詢條件

問題現(xiàn)象
sparksql加載指定Hive分區(qū)表路徑,生成的DataSet沒有分區(qū)字段。如,sparkSession.read.format("parquet").load(s"${hive_path}"),hive_path為Hive分區(qū)表在HDFS上的存儲路徑。
hive_path的幾種指定方式會導致這種情況的發(fā)生(test_partition是一個Hive外部分區(qū)表,dt是它的分區(qū)字段,分區(qū)數(shù)據(jù)有dt為20200101和20200102):
1.hive_path為"/spark/dw/test.db/test_partition/dt=20200101"
2.hive_path為"/spark/dw/test.db/test_partition/*"
因為牽涉到的源碼比較多,這里僅以示例的程序中涉及到的源碼中的class、object和方法,繪制成xmind圖如下,想細心研究的可以參考該圖到spark源碼中進行分析。

問題分析
這里主要給出幾個源碼段,結(jié)合上述xmind圖理解:
在沒有指定參數(shù)basePath的情況下:
1.hive_path為/spark/dw/test.db/test_partition/dt=20200101
sparksql底層處理后得到的basePaths: Set(new Path(“/spark/dw/test.db/test_partition/dt=20200101”))【偽代碼】

leafDirs: Seq(new Path(“/spark/dw/test.db/test_partition/dt=20200101”))【偽代碼】
2.hive_path為/spark/dw/test.db/test_partition/*
sparksql底層處理后得到的basePaths: Set(new Path(“/spark/dw/test.db/test_partition/dt=20200101”),new Path(“/spark/dw/test.db/test_partition/dt=20200102”))【偽代碼】

leafDirs: Seq(new Path(“/spark/dw/test.db/test_partition/dt=20200101”),new Path(“/spark/dw/test.db/test_partition/dt=20200102”))【偽代碼】
這兩種情況導致源碼if(basePaths.contains(currentPath))為true,還沒有解析分區(qū)就重置變量finished為true跳出循環(huán),因此最終生成的結(jié)果也就沒有分區(qū)字段:
解決方案
  • 1.在Spark SQL加載Hive表數(shù)據(jù)路徑時,指定參數(shù)basePath,如 sparkSession.read.option("basePath","/spark/dw/test.db/test_partition")

  • 2.主要重寫basePaths方法和parsePartition方法中的處理邏輯,同時需要修改其他涉及的代碼。由于涉及需要改寫的代碼比較多,可以封裝成工具

8.說說你對Spark SQL 小文件問題處理的理解

在生產(chǎn)中,無論是通過SQL語句或者Scala/Java等代碼的方式使用Spark SQL處理數(shù)據(jù),在Spark SQL寫數(shù)據(jù)時,往往會遇到生成的小文件過多的問題,而管理這些大量的小文件,是一件非常頭疼的事情。
大量的小文件會影響Hadoop集群管理或者Spark在處理數(shù)據(jù)時的穩(wěn)定性:
1.Spark SQL寫Hive或者直接寫入HDFS,過多的小文件會對NameNode內(nèi)存管理等產(chǎn)生巨大的壓力,會影響整個集群的穩(wěn)定運行
2.容易導致task數(shù)過多,如果超過參數(shù)spark.driver.maxResultSize的配置(默認1g),會拋出類似如下的異常,影響任務的處理
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
當然可以通過調(diào)大spark.driver.maxResultSize的默認配置來解決問題,但如果不能從源頭上解決小文件問題,以后還可能遇到類似的問題。此外,Spark在處理任務時,一個分區(qū)分配一個task進行處理,多個分區(qū)并行處理,雖然并行處理能夠提高處理效率,但不是意味著task數(shù)越多越好。如果數(shù)據(jù)量不大,過多的task運行反而會影響效率。最后,Spark中一個task處理一個分區(qū)從而也會影響最終生成的文件數(shù)。
在數(shù)倉建設中,產(chǎn)生小文件過多的原因有很多種,比如:
1.流式處理中,每個批次的處理執(zhí)行保存操作也會產(chǎn)生很多小文件 2.為了解決數(shù)據(jù)更新問題,同一份數(shù)據(jù)保存了不同的幾個狀態(tài),也容易導致文件數(shù)過多
那么如何解決這種小文件的問題呢?
  • 1.通過repartition或coalesce算子控制最后的DataSet的分區(qū)數(shù) 注意repartition和coalesce的區(qū)別

  • 2.將Hive風格的Coalesce and Repartition Hint 應用到Spark SQL 需要注意這種方式對Spark的版本有要求,建議在Spark2.4.X及以上版本使用,示例:

    INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
    INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
  • 3.小文件定期合并可以定時通過異步的方式針對Hive分區(qū)表的每一個分區(qū)中的小文件進行合并操作

上述只是給出3種常見的解決辦法,并且要結(jié)合實際用到的技術和場景去具體處理,比如對于HDFS小文件過多,也可以通過生成HAR 文件或者Sequence File來解決。

9.SparkSQL讀寫Hive metastore Parquet遇到過什么問題嗎?

Spark SQL為了更好的性能,在讀寫Hive metastore parquet格式的表時,會默認使用自己的Parquet SerDe,而不是采用Hive的SerDe進行序列化和反序列化。該行為可以通過配置參數(shù)spark.sql.hive.convertMetastoreParquet進行控制,默認true。
這里從表schema的處理角度而言,就必須注意Hive和Parquet兼容性,主要有兩個區(qū)別:1.Hive是大小寫敏感的,但Parquet相反 2.Hive會將所有列視為nullable,但是nullability在parquet里有獨特的意義
由于上面的原因,在將Hive metastore parquet轉(zhuǎn)化為Spark SQL parquet時,需要兼容處理一下Hive和Parquet的schema,即需要對二者的結(jié)構(gòu)進行一致化。主要處理規(guī)則是:
1.有相同名字的字段必須要有相同的數(shù)據(jù)類型,忽略nullability。兼容處理的字段應該保持Parquet側(cè)的數(shù)據(jù)類型,這樣就可以處理到nullability類型了(空值問題) 2.兼容處理的schema應只包含在Hive元數(shù)據(jù)里的schema信息,主要體現(xiàn)在以下兩個方面:(1)只出現(xiàn)在Parquet schema的字段會被忽略 (2)只出現(xiàn)在Hive元數(shù)據(jù)里的字段將會被視為nullable,并處理到兼容后的schema中
關于schema(或者說元數(shù)據(jù)metastore),Spark SQL在處理Parquet表時,同樣為了更好的性能,會緩存Parquet的元數(shù)據(jù)信息。此時,如果直接通過Hive或者其他工具對該Parquet表進行修改導致了元數(shù)據(jù)的變化,那么Spark SQL緩存的元數(shù)據(jù)并不能同步更新,此時需要手動刷新Spark SQL緩存的元數(shù)據(jù),來確保元數(shù)據(jù)的一致性,方式如下:
// 第一種方式應用的比較多
1. sparkSession.catalog.refreshTable(s"${dbName.tableName}")
2. sparkSession.catalog.refreshByPath(s"${path}")

10.說說Spark SQL如何選擇join策略

在了解join策略選擇之前,首先看幾個先決條件:
  • 1. build table的選擇

Hash Join的第一步就是根據(jù)兩表之中較小的那一個構(gòu)建哈希表,這個小表就叫做build table,大表則稱為probe table,因為需要拿小表形成的哈希表來"探測"它。源碼如下:
/* 左表作為build table的條件,join類型需滿足:
1. InnerLike:實現(xiàn)目前包括inner join和cross join
2. RightOuter:right outer join
*/
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | RightOuter => true
case _ => false
}

/* 右表作為build table的條件,join類型需滿足(第1種是在業(yè)務開發(fā)中寫的SQL主要適配的):
1. InnerLike、LeftOuter(left outer join)、LeftSemi(left semi join)、LeftAnti(left anti join)
2. ExistenceJoin:only used in the end of optimizer and physical plans, we will not generate SQL for this join type
*/
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
  • 2. 滿足什么條件的表才能被廣播

如果一個表的大小小于或等于參數(shù)spark.sql.autoBroadcastJoinThreshold(默認10M)配置的值,那么就可以廣播該表。源碼如下:
private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: Boolean = {
val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
val buildRight = canBuildRight(joinType) && canBroadcast(right)
buildLeft || buildRight
}

private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}

private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: BuildSide = {
val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
val buildRight = canBuildRight(joinType) && canBroadcast(right)

// 最終會調(diào)用broadcastSide
broadcastSide(buildLeft, buildRight, left, right)
}
除了通過上述表的大小滿足一定條件之外,我們也可以通過直接在Spark SQL中顯示使用hint方式(/+ BROADCAST(small_table) /),直接指定要廣播的表,源碼如下:
private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: Boolean = {
val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
buildLeft || buildRight
}

private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: BuildSide = {
val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast

// 最終會調(diào)用broadcastSide
broadcastSide(buildLeft, buildRight, left, right)
}
無論是通過表大小進行廣播還是根據(jù)是否指定hint進行表廣播,最終都會調(diào)用broadcastSide,來決定應該廣播哪個表:
private def broadcastSide(
canBuildLeft: Boolean,
canBuildRight: Boolean,
left: LogicalPlan,
right: LogicalPlan): BuildSide = {

def smallerSide =
if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft

if (canBuildRight && canBuildLeft) {
// 如果左表和右表都能作為build table,則將根據(jù)表的統(tǒng)計信息,確定physical size較小的表作為build table(即使兩個表都被指定了hint)
smallerSide
} else if (canBuildRight) {
// 上述條件不滿足,優(yōu)先判斷右表是否滿足build條件,滿足則廣播右表。否則,接著判斷左表是否滿足build條件
BuildRight
} else if (canBuildLeft) {
BuildLeft
} else {
// 如果左表和右表都不能作為build table,則將根據(jù)表的統(tǒng)計信息,確定physical size較小的表作為build table。目前主要用于broadcast nested loop join
smallerSide
}
}
從上述源碼可知,即使用戶指定了廣播hint,實際執(zhí)行時,不一定按照hint的表進行廣播。
  • 3. 是否可構(gòu)造本地HashMap

應用于Shuffle Hash Join中,源碼如下:
// 邏輯計劃的單個分區(qū)足夠小到構(gòu)建一個hash表
// 注意:要求分區(qū)數(shù)是固定的。如果分區(qū)數(shù)是動態(tài)的,還需滿足其他條件
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
// 邏輯計劃的physical size小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions(默認200)時,即可構(gòu)造本地HashMap
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
SparkSQL目前主要實現(xiàn)了3種join:Broadcast Hash Join、ShuffledHashJoin、Sort Merge Join。那么Catalyst在處理SQL語句時,是依據(jù)什么規(guī)則進行join策略選擇的呢?
1. Broadcast Hash Join
主要根據(jù)hint和size進行判斷是否滿足條件。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
2. Shuffle Hash Join
選擇Shuffle Hash Join需要同時滿足以下條件:
  1. spark.sql.join.preferSortMergeJoin為false,即Shuffle Hash Join優(yōu)先于Sort Merge Join

  2. 右表或左表是否能夠作為build table

  3. 是否能構(gòu)建本地HashMap

  4. 以右表為例,它的邏輯計劃大小要遠小于左表大小(默認3倍)

上述條件優(yōu)先檢查右表。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && uildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
如果不滿足上述條件,但是如果參與join的表的key無法被排序,即無法使用Sort Merge Join,最終也會選擇Shuffle Hash Join。
!RowOrdering.isOrderable(leftKeys)

def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
3. Sort Merge Join
如果上面兩種join策略(Broadcast Hash Join和Shuffle Hash Join)都不符合條件,并且參與join的key是可排序的,就會選擇Sort Merge Join。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
4. Without joining keys
Broadcast Hash Join、Shuffle Hash Join和Sort Merge Join都屬于經(jīng)典的ExtractEquiJoinKeys(等值連接條件)。
對于非ExtractEquiJoinKeys,則會優(yōu)先檢查表是否可以被廣播(hint或者size)。如果可以,則會使用BroadcastNestedLoopJoin(簡稱BNLJ),熟悉Nested Loop Join則不難理解BNLJ,主要卻別在于BNLJ加上了廣播表。
源碼如下:
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
如果表不能被廣播,又細分為兩種情況:
  1. 若join類型InnerLike(關于InnerLike上面已有介紹)對量表直接進行笛卡爾積處理若

  2. 上述情況都不滿足,最終方案是選擇兩個表中physical size較小的表進行廣播,join策略仍為BNLJ

源碼如下:
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

case logical.Join(left, right, joinType, condition) =>
val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

11.講講Spark SQL中Not in Subquery為何低效以及如何規(guī)避

首先看個Not in Subquery的SQL:
// test_partition1 和 test_partition2為Hive外部分區(qū)表
select * from test_partition1 t1 where t1.id not in (select id from test_partition2);
對應的完整的邏輯計劃和物理計劃為:
== Parsed Logical Plan ==
'Project [*]
+- 'Filter NOT 't1.id IN (list#3 [])
: +- 'Project ['id]
: +- 'UnresolvedRelation `test_partition2`
+- 'SubqueryAlias `t1`
+- 'UnresolvedRelation `test_partition1`

== Analyzed Logical Plan ==
id: string, name: string, dt: string
Project [id#4, name#5, dt#6]
+- Filter NOT id#4 IN (list#3 [])
: +- Project [id#7]
: +- SubqueryAlias `default`.`test_partition2`
: +- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]
+- SubqueryAlias `t1`
+- SubqueryAlias `default`.`test_partition1`
+- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]

== Optimized Logical Plan ==
Join LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- Project [id#7]
+- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#7, name#8], [dt#9]

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#4 = id#7) || isnull((id#4 = id#7)))
:- Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
+- BroadcastExchange IdentityBroadcastMode
+- Scan hive default.test_partition2 [id#7], HiveTableRelation `default
通過上述邏輯計劃和物理計劃可以看出,Spark SQL在對not in subquery處理,從邏輯計劃轉(zhuǎn)換為物理計劃時,會最終選擇BroadcastNestedLoopJoin(對應到Spark源碼中BroadcastNestedLoopJoinExec.scala)策略。
提起B(yǎng)roadcastNestedLoopJoin,不得不提Nested Loop Join,它在很多RDBMS中得到應用,比如mysql。它的工作方式是循環(huán)從一張表(outer table)中讀取數(shù)據(jù),然后訪問另一張表(inner table,通常有索引),將outer表中的每一條數(shù)據(jù)與inner表中的數(shù)據(jù)進行join,類似一個嵌套的循環(huán)并且在循環(huán)的過程中進行數(shù)據(jù)的比對校驗是否滿足一定條件。
對于被連接的數(shù)據(jù)集較小的情況下,Nested Loop Join是個較好的選擇。但是當數(shù)據(jù)集非常大時,從它的執(zhí)行原理可知,效率會很低甚至可能影響整個服務的穩(wěn)定性。
而Spark SQL中的BroadcastNestedLoopJoin就類似于Nested Loop Join,只不過加上了廣播表(build table)而已。
BroadcastNestedLoopJoin是一個低效的物理執(zhí)行計劃,內(nèi)部實現(xiàn)將子查詢(select id from test_partition2)進行廣播,然后test_partition1每一條記錄通過loop遍歷廣播的數(shù)據(jù)去匹配是否滿足一定條件。
private def leftExistenceJoin(
// 廣播的數(shù)據(jù)
relation: Broadcast[Array[InternalRow]],
exists: Boolean): RDD[InternalRow] = {
assert(buildSide == BuildRight)

/* streamed對應物理計劃中:
Scan hive default.test_partition1 [id#4, name#5, dt#6], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#4, name#5], [dt#6]
*/
streamed.execute().mapPartitionsInternal { streamedIter =>
val buildRows = relation.value
val joinedRow = new JoinedRow

// 條件是否定義。此處為Some(((id#4 = id#7) || isnull((id#4 = id#7))))
if (condition.isDefined) {
streamedIter.filter(l =>
// exists主要是為了根據(jù)joinType來進一步條件判斷數(shù)據(jù)的返回與否,此處joinType為LeftAnti
buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
)

// else
} else if (buildRows.nonEmpty == exists) {
streamedIter
} else {
Iterator.empty
}
}
}
由于BroadcastNestedLoopJoin的低效率執(zhí)行,可能導致長時間占用executor資源,影響集群性能。同時,因為子查詢的結(jié)果集要進行廣播,如果數(shù)據(jù)量特別大,對driver端也是一個嚴峻的考驗,極有可能帶來OOM的風險。因此,在實際生產(chǎn)中,要盡可能利用其他效率相對高的SQL來避免使用Not in Subquery。
雖然通過改寫Not in Subquery的SQL,進行低效率的SQL到高效率的SQL過渡,能夠避免上面所說的問題。但是這往往建立在我們發(fā)現(xiàn)任務執(zhí)行慢甚至失敗,然后排查任務中的SQL,發(fā)現(xiàn)"問題"SQL的前提下。那么如何在任務執(zhí)行前,就"檢查"出這樣的SQL,從而進行提前預警呢?
這里給出一個思路,就是解析Spark SQL計劃,根據(jù)Spark SQL的join策略匹配條件等,來判斷任務中是否使用了低效的Not in Subquery進行預警,然后通知業(yè)務方進行修改。同時,我們在實際完成數(shù)據(jù)的ETL處理等分析時,也要事前避免類似的低性能SQL。

12.說說SparkSQL中產(chǎn)生笛卡爾積的幾種典型場景以及處理策略

Spark SQL幾種產(chǎn)生笛卡爾積的典型場景
首先來看一下在Spark SQL中產(chǎn)生笛卡爾積的幾種典型SQL:
  1. join語句中不指定on條件

    select * from test_partition1 join test_partition2;
  2. join語句中指定不等值連接

    select * from test_partition1 t1 inner join test_partition2 t2 on t1.name <> t2.name;
  3. join語句on中用or指定連接條件

    select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id or t1.name = t2.name;
  4. join語句on中用||指定連接條件

    select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.id || t1.name = t2.name;

    除了上述舉的幾個典型例子,實際業(yè)務開發(fā)中產(chǎn)生笛卡爾積的原因多種多樣。

同時需要注意,在一些SQL中即使?jié)M足了上述4種規(guī)則之一也不一定產(chǎn)生笛卡爾積。比如,對于join語句中指定不等值連接條件的下述SQL不會產(chǎn)生笛卡爾積:
--在Spark SQL內(nèi)部優(yōu)化過程中針對join策略的選擇,最終會通過SortMergeJoin進行處理。
select * from test_partition1 t1 join test_partition2 t2 on t1.id = t2.i
此外,對于直接在SQL中使用cross join的方式,也不一定產(chǎn)生笛卡爾積。比如下述SQL:
-- Spark SQL內(nèi)部優(yōu)化過程中選擇了SortMergeJoin方式進行處理
select * from test_partition1 t1 cross join test_partition2 t2 on t1.id = t2.id;
但是如果cross join沒有指定on條件同樣會產(chǎn)生笛卡爾積。那么如何判斷一個SQL是否產(chǎn)生了笛卡爾積呢?
Spark SQL是否產(chǎn)生了笛卡爾積
以join語句不指定on條件產(chǎn)生笛卡爾積的SQL為例:
-- test_partition1和test_partition2是Hive分區(qū)表
select * from test_partition1 join test_partition2;
通過Spark UI上SQL一欄查看上述SQL執(zhí)行圖,如下:
可以看出,因為該join語句中沒有指定on連接查詢條件,導致了CartesianProduct即笛卡爾積。
再來看一下該join語句的邏輯計劃和物理計劃:
可以看出,因為該join語句中沒有指定on連接查詢條件,導致了CartesianProduct即笛卡爾積。
再來看一下該join語句的邏輯計劃和物理計劃:
== Parsed Logical Plan ==
'GlobalLimit 1000
+- 'LocalLimit 1000
+- 'Project [*]
+- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
id: string, name: string, dt: string, id: string, name: string, dt: string
GlobalLimit 1000
+- LocalLimit 1000
+- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
+- SubqueryAlias `t`
+- Project [id#84, name#85, dt#86, id#87, name#88, dt#89]
+- Join Inner
:- SubqueryAlias `default`.`test_partition1`
: +- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
+- SubqueryAlias `default`.`test_partition2`
+- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

== Optimized Logical Plan ==
GlobalLimit 1000
+- LocalLimit 1000
+- Join Inner
:- HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
+- HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]

== Physical Plan ==
CollectLimit 1000
+- CartesianProduct
:- Scan hive default.test_partition1 [id#84, name#85, dt#86], HiveTableRelation `default`.`test_partition1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#84, name#85], [dt#86]
+- Scan hive default.test_partition2 [id#87, name#88, dt#89], HiveTableRelation `default`.`test_partition2`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, [id#87, name#88], [dt#89]
通過邏輯計劃到物理計劃,以及最終的物理計劃選擇CartesianProduct,可以分析得出該SQL最終確實產(chǎn)生了笛卡爾積。
Spark SQL中產(chǎn)生笛卡爾積的處理策略
Spark SQL中主要有ExtractEquiJoinKeys(Broadcast Hash Join、Shuffle Hash Join、Sort Merge Join,這3種是我們比較熟知的Spark SQL join)和Without joining keys(CartesianProduct、BroadcastNestedLoopJoin)join策略。
那么,如何判斷SQL是否產(chǎn)生了笛卡爾積就迎刃而解。
  1. 在利用Spark SQL執(zhí)行SQL任務時,通過查看SQL的執(zhí)行圖來分析是否產(chǎn)生了笛卡爾積。如果產(chǎn)生笛卡爾積,則將任務殺死,進行任務優(yōu)化避免笛卡爾積?!静煌扑]。用戶需要到Spark UI上查看執(zhí)行圖,并且需要對Spark UI界面功能等要了解,需要一定的專業(yè)性。(注意:這里之所以這樣說,是因為Spark SQL是計算引擎,面向的用戶角色不同,用戶不一定對Spark本身了解透徹,但熟悉SQL。對于做平臺的小伙伴兒,想必深有感觸)】

  2. 分析Spark SQL的邏輯計劃和物理計劃,通過程序解析計劃推斷SQL最終是否選擇了笛卡爾積執(zhí)行策略。如果是,及時提示風險。具體可以參考Spark SQL join策略選擇的源碼:

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
}

13.具體講講Spark SQL/Hive中的一些實用函數(shù)

字符串函數(shù)
1. concat 對字符串進行拼接:concat(str1, str2, ..., strN) ,參數(shù):str1、str2...是要進行拼接的字符串。
-- return the concatenation of str1、str2、..., strN
-- SparkSQL
select concat('Spark', 'SQL');
2. concat_ws 在拼接的字符串中間添加某種分隔符:concat_ws(sep, [str | array(str)]+)。參數(shù)1:分隔符,如 - ;參數(shù)2:要拼接的字符串(可多個)
-- return the concatenation of the strings separated by sep
-- Spark-SQL
select concat_ws("-", "Spark", "SQL");
3. encode 設置編碼格式:encode(str, charset)。參數(shù)1:要進行編碼的字符串 ;參數(shù)2:使用的編碼格式,如UTF-8
-- encode the first argument using the second argument character set
select encode("HIVE", "UTF-8");
4. decode 轉(zhuǎn)碼:decode(bin, charset)。參數(shù)1:進行轉(zhuǎn)碼的binary ;參數(shù)2:使用的轉(zhuǎn)碼格式,如UTF-8
-- decode the first argument using the second argument character set
select decode(encode("HIVE", "UTF-8"), "UTF-8");
5. format_string / printf 格式化字符串:format_string(strfmt, obj, ...)
-- returns a formatted string from printf-style format strings
select format_string("Spark SQL %d %s", 100, "days");
6. initcap / lower / upper initcap:將每個單詞的首字母轉(zhuǎn)為大寫,其他字母小寫。單詞之間以空白分隔。upper:全部轉(zhuǎn)為大寫。lower:全部轉(zhuǎn)為小寫。
-- Spark Sql
select initcap("spaRk sql");

-- SPARK SQL
select upper("sPark sql");

-- spark sql
select lower("Spark Sql");
7. length
返回字符串的長度。
-- 返回4
select length("Hive");
8. lpad / rpad
返回固定長度的字符串,如果長度不夠,用某種字符進行補全。lpad(str, len, pad):左補全 rpad(str, len, pad):右補全 注意:如果參數(shù)str的長度大于參數(shù)len,則返回的結(jié)果長度會被截取為長度為len的字符串
-- vehi
select lpad("hi", 4, "ve");

-- hive
select rpad("hi", 4, "ve");

-- spar
select lpad("spark", 4, "ve");
9. trim / ltrim / rtrim
去除空格或者某種字符。trim(str) / trim(trimStr, str):首尾去除。ltrim(str) / ltrim(trimStr, str):左去除。rtrim(str) / rtrim(trimStr, str):右去除。
-- hive
select trim(" hive ");

-- arkSQLS
SELECT ltrim("Sp", "SSparkSQLS") as tmp;
10. regexp_extract
正則提取某些字符串
-- 2000
select regexp_extract("1000-2000", "(\\d+)-(\\d+)", 2);
11. regexp_replace
正則替換
-- r-r
select regexp_replace("100-200", "(\\d+)", "r");
12. repeat
repeat(str, n):復制給定的字符串n次
-- aa
select repeat("a", 2);
13. instr / locate
返回截取字符串的位置。如果匹配的字符串不存在,則返回0
-- returns the (1-based) index of the first occurrence of substr in str.

-- 6
select instr("SparkSQL", "SQL");

-- 0
select locate("A", "fruit");
14. space 在字符串前面加n個空格
select concat(space(2), "A");
15. split split(str, regex):以某字符拆分字符串 split(str, regex)
-- ["one","two"]
select split("one two", " ");
16. substr / substring_index
-- k SQL
select substr("Spark SQL", 5);

-- 從后面開始截取,返回SQL
select substr("Spark SQL", -3);

-- k
select substr("Spark SQL", 5, 1);

-- org.apache。注意:如果參數(shù)3為負值,則從右邊取值
select substring_index("org.apache.spark", ".", 2);
17. translate
替換某些字符為指定字符
-- The translate will happen when any character in the string matches the character in the `matchingString`
-- A1B2C3
select translate("AaBbCc", "abc", "123");
JSON函數(shù)
  1. get_json_object

-- v2
select get_json_object('{"k1": "v1", "k2": "v2"}', '$.k2');
  1. from_json

select tmp.k from (
select from_json('{"k": "fruit", "v": "apple"}','k STRING, v STRING', map("","")) as tmp
);
  1. to_json

-- 可以把所有字段轉(zhuǎn)化為json字符串,然后表示成value字段
select to_json(struct(*)) AS value;
時間函數(shù)
  1. current_date / current_timestamp 獲取當前時間

select current_date;

select current_timestamp;
  1. 從日期時間中提取字段/格式化時間 1)year、month、day、dayofmonth、hour、minute、second

-- 20
select day("2020-12-20");
2)dayofweek(1 = Sunday, 2 = Monday, ..., 7 = Saturday)、dayofye
-- 7
select dayofweek("2020-12-12");
3)weekofyear(date)
/**
* Extracts the week number as an integer from a given date/timestamp/string.
*
* A week is considered to start on a Monday and week 1 is the first week with more than 3 days,
* as defined by ISO 8601
*
* @return An integer, or null if the input was a string that could not be cast to a date
* @group datetime_funcs
* @since 1.5.0
*/
def weekofyear(e: Column): Column = withExpr { WeekOfYear(e.expr) }

-- 50
select weekofyear("2020-12-12");
4)trunc 截取某部分的日期,其他部分默認為01。第二個參數(shù): YEAR、YYYY、YY、MON、MONTH、MM
-- 2020-01-01
select trunc("2020-12-12", "YEAR");

-- 2020-12-01
select trunc("2020-12-12", "MM");
5)date_trunc 參數(shù):YEAR、YYYY、YY、MON、MONTH、MM、DAY、DD、HOUR、MINUTE、SECOND、WEEK、QUARTER
-- 2012-12-12 09:00:00
select date_trunc("HOUR" ,"2012-12-12T09:32:05.359");
6)date_format 按照某種格式格式化時間
-- 2020-12-12
select date_format("2020-12-12 12:12:12", "yyyy-MM-dd");
3. 日期時間轉(zhuǎn)換
1)unix_timestamp 返回當前時間的unix時間戳。
select unix_timestamp();

-- 1609257600
select unix_timestamp("2020-12-30", "yyyy-MM-dd");
2)from_unixtime 將unix epoch(1970-01-01 00:00:00 UTC)中的秒數(shù)轉(zhuǎn)換為以給定格式表示當前系統(tǒng)時區(qū)中該時刻的時間戳的字符串。
select from_unixtime(1609257600, "yyyy-MM-dd HH:mm:ss");
3)to_unix_timestamp 將時間轉(zhuǎn)化為時間戳。
-- 1609257600
select to_unix_timestamp("2020-12-30", "yyyy-MM-dd");
4)to_date / date 將時間字符串轉(zhuǎn)化為date。
-- 2020-12-30
select to_date("2020-12-30 12:30:00");
select date("2020-12-30");
5)to_timestamp 將時間字符串轉(zhuǎn)化為timestamp。
select to_timestamp("2020-12-30 12:30:00");
6)quarter 從給定的日期/時間戳/字符串中提取季度。
-- 4
select quarter("2020-12-30");
4. 日期、時間計算
1)months_between(end, start) 返回兩個日期之間的月數(shù)。參數(shù)1為截止時間,參數(shù)2為開始時間
-- 3.94959677
select months_between("1997-02-28 10:30:00", "1996-10-30");
2)add_months 返回某日期后n個月后的日期。
-- 2020-12-28
select add_months("2020-11-28", 1);
3)last_day(date) 返回某個時間的當月最后一天
-- 2020-12-31
select last_day("2020-12-01");
4)next_day(start_date, day_of_week) 返回某時間后the first date基于specified day of the week。參數(shù)1:開始時間。參數(shù)2:Mon、Tue、Wed、Thu、Fri、Sat、Sun。
-- 2020-12-07
select next_day("2020-12-01", "Mon");
5)date_add(start_date, num_days)
返回指定時間增加num_days天后的時間
-- 2020-12-02
select date_add("2020-12-01", 1);
6)datediff(endDate, startDate) 兩個日期相差的天數(shù)
-- 3
select datediff("2020-12-01", "2020-11-28");
7)關于UTC時間
-- to_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' would yield '2017-07-14 01:40:00.0'.

select to_utc_timestamp("2020-12-01", "Asia/Seoul") ;

-- from_utc_timestamp(timestamp, timezone) - Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' would yield '2017-07-14 03:40:00.0'.

select from_utc_timestamp("2020-12-01", "Asia/Seoul");
常用的開窗函數(shù)
開窗函數(shù)格式通常滿足:
function_name([argument_list]) OVER ( [PARTITION BY partition_expression,…] [ORDER BY sort_expression, … [ASC|DESC]])
function_name: 函數(shù)名稱,比如SUM()、AVG()
partition_expression:分區(qū)列
sort_expression:排序列
注意:以下舉例涉及的表employee中字段含義:name(員工姓名)、dept_no(部門編號)、salary(工資)
  1. cume_dist

如果按升序排列,則統(tǒng)計:小于等于當前值的行數(shù)/總行數(shù)(number of rows ≤ current row)/(total number of rows)。如果是降序排列,則統(tǒng)計:大于等于當前值的行數(shù)/總行數(shù)。用于累計統(tǒng)計。
  1. lead(value_expr[,offset[,default]])

用于統(tǒng)計窗口內(nèi)往下第n行值。第一個參數(shù)為列名,第二個參數(shù)為往下第n行(可選,默認為1),第三個參數(shù)為默認值(當往下第n行為NULL時候,取默認值,如不指定,則為NULL)。
  1. lag(value_expr[,offset[,default]])

與lead相反,用于統(tǒng)計窗口內(nèi)往上第n行值。第一個參數(shù)為列名,第二個參數(shù)為往上第n行(可選,默認為1),第三個參數(shù)為默認值(當往上第n行為NULL時候,取默認值,如不指定,則為NULL)。
  1. first_value

取分組內(nèi)排序后,截止到當前行,第一個值。
  1. last_value

取分組內(nèi)排序后,截止到當前行,最后一個值。
  1. rank

對組中的數(shù)據(jù)進行排名,如果名次相同,則排名也相同,但是下一個名次的排名序號會出現(xiàn)不連續(xù)。比如查找具體條件的topN行。RANK() 排序為 (1,2,2,4)。
  1. dense_rank

dense_rank函數(shù)的功能與rank函數(shù)類似,dense_rank函數(shù)在生成序號時是連續(xù)的,而rank函數(shù)生成的序號有可能不連續(xù)。當出現(xiàn)名次相同時,則排名序號也相同。而下一個排名的序號與上一個排名序號是連續(xù)的。DENSE_RANK() 排序為 (1,2,2,3)。
  1. SUM/AVG/MIN/MAX

數(shù)據(jù):
id time pv
1 2015-04-10 1
1 2015-04-11 3
1 2015-04-12 6
1 2015-04-13 3
1 2015-04-14 2
2 2015-05-15 8
2 2015-05-16 6
結(jié)果:
SELECT id,
time,
pv,
SUM(pv) OVER(PARTITION BY id ORDER BY time) AS pv1, -- 默認為從起點到當前行
SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS pv2, --從起點到當前行,結(jié)果同pv1
SUM(pv) OVER(PARTITION BY id) AS pv3, --分組內(nèi)所有行
SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS pv4, --當前行+往前3行
SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN 3 PRECEDING AND 1 FOLLOWING) AS pv5, --當前行+往前3行+往后1行
SUM(pv) OVER(PARTITION BY id ORDER BY time ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS pv6 ---當前行+往后所有行
FROM data;
  1. NTILE

NTILE(n),用于將分組數(shù)據(jù)按照順序切分成n片,返回當前切片值。
NTILE不支持ROWS BETWEEN,比如 NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)。
如果切片不均勻,默認增加第一個切片的分布。
  1. ROW_NUMBER

從1開始,按照順序,生成分組內(nèi)記錄的序列。
比如,按照pv降序排列,生成分組內(nèi)每天的pv名次 ROW_NUMBER() 的應用場景非常多,比如獲取分組內(nèi)排序第一的記錄。
    八千里路云和月 | 從零到大數(shù)據(jù)專家學習路徑指南
    我們在學習Flink的時候,到底在學習什么?
    193篇文章暴揍Flink,這個合集你需要關注一下
    Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
    Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
    我們在學習Spark的時候,到底在學習什么?
    在所有Spark模塊中,我愿稱SparkSQL為最強!
    硬剛Hive | 4萬字基礎調(diào)優(yōu)面試小總結(jié)
    數(shù)據(jù)治理方法論和實踐小百科全書
    標簽體系下的用戶畫像建設小指南
    4萬字長文 | ClickHouse基礎&實踐&調(diào)優(yōu)全視角解析
    【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
    大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
    我寫過的關于成長/面試/職場進階的文章
    當我們在學習Hive的時候在學習什么?「硬剛Hive續(xù)集」

你好,我是王知無,一個大數(shù)據(jù)領域的硬核原創(chuàng)作者。

做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)&、算法工程化。

專注大數(shù)據(jù)領域?qū)崟r動態(tài)&技術提升&個人成長&職場進階,歡迎關注。

瀏覽 55
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報
評論
圖片
表情
推薦
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 日韩免费高清视频| 屁屁影院CCYYCOM国产| 码人妻免费视频| 中文字幕无码在线视频| 成人黄色导航| 亚洲区在线| 婷婷国产亚洲精品网站| 亚洲av无码精品| 北条麻妃AV在线播放| 久久精品视频免费观看| 青娱乐成人网| 国产无码黄片| 天天看天天操| 国产精品视频在线播放| 五月天久久婷婷| 亚洲精品乱码久久久久久按摩观 | 无码网址| 成人小说亚洲一区二区三区| 丁香五月色情| 91中文| 狼友视频首页| 色婷婷无码| 久久无码人妻精品一区二区三区| 国产一卡二卡在线观看| 国产高清无码片| 国产愉拍91九色国产愉拍| 99成人国产精品视频| 久在草| 91.www91成人影视在线观看91成人网址9 | 亚洲视频在线观看播放| 国产久久精品| 一区二区三区久久| 九九视频网| 日韩精品人妻中文字幕| 国产无码久久久| 亚洲成人久久久| 人妻丝袜中出北条麻妃| 新版欧美内射大全| 亚洲天堂中文字幕| 日本成人电影一区二区三区| 国产精品揄拍一区二区| 天堂a在线8| 亚洲精品内射| 免费观看黄片视频| 欧美精品久久| 色99在线| 三级片无码视频| 美女毛片网站| 欧美黄色毛片| 18sav| 日本九九视频| 婷婷丁香激情五月天| 男女免费av| 亚洲高清无码在线播放| 高清毛片AAAAAAAAA片| 国产91网| 99热思思| 欧美女人操逼| 亚洲国产精品18久久久久久| 色婷婷在线视频观看| 国产毛片18水真多18精品| 91麻豆福利视频| 精品免费囯产| 国产真实露脸乱子伦对白高清视频| 九九草在线视频| 国产亚洲久一区二区三区| 很很日| 国产精品不卡在线观看| 激情av在线观看| 成人亚洲在线| 加勒比久久88| 日韩免费高清无码| 人人摸人人操人人爽| 岛国无码破解AV在线播放| 麻豆视屏| 久草新在线| 日韩中文字幕在线高清| 日韩AV在线电影| 日韩性爱av| 人妻一区二区三区| 91精品人妻少妇无码影院| 成人AV在线看| 欧美亚洲中文字幕| 色国产视频| 97在线超碰| 亚洲秘无码一区二区三区| 午夜天堂精品久久久久| 男女av免费| 狠狠撸天天日| 亚洲自拍无码| 欧美日韩a片| 特黄网站| 可以在线观看的AV| 日本黄色精品| 久久国产精品网站| 国产精品国产精品国产专区不52| 亚洲在线免费观看| 97久久人人| 五月丁香色色网| 亚洲午夜福利在线观看| 亚洲激情无码视频| 亚洲高清视屏| 一级免费黄片| 免费无码国产在线55| 亚洲AV秘一区二区色盗战流出| 成人黄片免费| 日韩在线国产| 夜夜撸| 人人色视频| 国产美女自拍| AV在线资源| 亚洲精品久久久久久久久豆丁网 | 青娱乐亚洲| 91在线成人| 久久动态图| 伊人97| 欧美一区二区三区精品| 四虎成人精品永久免费AV九九| 免费黄色成人网站| 国产乱伦免费视频| 亚洲福利一区二区| 国产91无码精品秘入口新欢| 日本在线一区| 综合狠狠| 亚洲无码电影在线观看| 亚洲激情无码视频| www.黄色大片| 欧美日韩三级片| 一区二区国产精品| 日韩欧美高清在线| 国产三级国产三级国产普通话| 熟妇人妻中文AV| 欧美区亚洲区| 久久久久久黄| 日韩中文视频| 国产成人无码区免费AV片在线| 三级成人视频| 伊人色爱| 成人精品无码| 99久久久国产精品无码| 风流老熟女一区二区三区| 亚洲色视频在线观看| 首页-91n| 91无码在线视频| 九色PORNY国产成人| 欧美性猛交XXXX乱大交| 国产v片| 日韩性爱AV| 我爱大香蕉| 五月婷婷中文版| 中文字幕一级A片免费看| 少妇搡BBBB搡BBB搡视频一级| 日韩一级片免费看| 亚洲成免费| 丁香五香天堂| 五月婷婷综合网| 色婷婷色99国产综合精品| 男女av免费| 亚洲午夜福利视频在线观看| AV草逼| 欧美成人视频在线观看| 91麻豆精品91久久久ios版| 粉嫩小泬粉嫩小泬在线| 黄色片网站视频| 狠狠躁婷婷天天爽综合| 国产一级在线| 日韩极品视频| 成人伊人综合| 日韩一级片免费| 激情人妻网站| 色色色免费视频| 欧美一二三区黄色免费视屏| 日韩AV电影在线观看| 中国黄色大片| 黑人巨粗进入疼哭A片| 波多野结衣国产区42部| 337P大胆粉嫩银噜噜噜| 日本麻豆| 手机看片午夜福利网| 人人澡人人澡| 国产h视频在线观看| 欧美日一区二区三区| 国产不卡在线视频| 大香蕉伊人综合在线| 亚洲成人一| 九九九在线观看视频| 北条麻妃无码| 在线观看黄色小视频| 内射国产| 亚洲无码成人片| www.热久久| ChineSe露脸老女人| 国产成人毛片18女人18精品| 久久久久国产一区二区三区四区| 日韩一级片在线观看| 五月天福利视频| 中文字幕免费在线视频| 壁特壁视频在线观看| 天堂久久久久| 国产精品色哟哟| 日韩黄色A级片| 亚洲无码播放| 日本一级a片| 91豆花视频| 国产又大又粗又长| 婷婷丁香综合| 香蕉视频亚洲| 色网站在线观看| 天天看片天天爽| 狼人一区二区| 黄色视频免费国产| 国产又爽又黄免费网站在| 日韩精品无码AV| 日韩加勒比在线| 亚洲黄色在线播放| 无码在线看| 国产精品伦子伦免费视频| 婷婷好色五月天| 国产一区视频在线| 成人精品永久免费视频99久久精品 | 91九色丨国产丨爆乳| 亚洲v天堂| www.国产视频| 无码一二三区| 日韩黄色网址| 中文字幕视频在线| 天堂一区二区| 中文字幕巨乱亚洲高清A片28| 久久精品一区二区| 一区二区三区欧美| 北条麻妃无码视频| 青草中文娱乐网在线| 日韩高清国产一区在线| 99热免费| 天堂网址激情网址| 成人区色情综合小说| 国产精品91久久久| 91久久性奴调教| 免费视频在线观看黄| 国产黄色电影在线| 日韩激情一区二区| 色欲精品| 俺也来www俺也色com| 东北老女人性爱视频| 综合合一品道| 91麻豆精品国产91久久久久久| 成人图片小说| 国产欧美日韩一区二区三区| 无码AV中文字幕| 免费v片在线观看| 俺也去啦WWW色官网| 无码高清一区| 鸡巴网站| 无码高清视频| 国产三级日本三级国产三级| 国产美女被| 在线操逼视频| 成人无码精品亚洲| 高清无码日本| 国产在线观看无码免费视频| 国产精品国产精品国产专区不片| 人人澡人人澡| 日韩字幕久久| 国产一级免费观看| 亚洲AV无码黑人专区| 俺去啦俺去啦| 欧美成人在线免费视频| 内射久久| 91拍真实国产伦偷精品| 日批视频| 亚洲欧洲在线播放| 8050午夜一级| 大香蕉日| 成人激情视频网| 色色色99| 久草资源| 大香蕉伊人操| 久久久久黄片| 中文无码字幕在线| 日韩乱伦电影| 午夜精品久久久久久久99老熟妇| 最新久欠一区二区免费看| 日韩操比视频| 操逼网五月天| 人人操人人摸人人爱| 日屁视频| 中文字幕三级av片| 在线看一区二区三区| 草逼毛片| 黄页免费视频| 一本道高清无码视频| 亚洲欧美日韩黑料吃瓜在线观看| 青草在线视频| 一级a免一级a做片免费| 国产靠逼视频| 亚洲无线观看| 天天干天天日天天操| 日本色影院| 成人毛片18女人毛片真水| 亚洲二级片| 黄色三级视频| 亚洲综合伊人无码| 欧美精品操逼| 大香蕉a片| 97碰碰碰| 中文字幕东京热加勒比| 美女被操面费网站| 懂色av懂色av粉嫩av无码| 中文在线字幕电视剧免费平台| 青青草五月天色婷婷丁香| 国产在线你懂得| 69人妻人人澡人人爽久久| 中文字幕精品亚洲熟女| 国产免费黄色| 欧美色图在线观看视频| 免费看黄色A片| 3d动漫精品一区二区三区在线观看 | 日韩人妻精品中文字幕专区不卡| 国产精品久久久久久久久久九秃 | 亚洲骚逼| 日韩高清久久| 超碰成人在线免费观看| 欧美拍拍视频| 91精品婷婷国产| 人人操人人操人人操人人操| 国产一级黄色录像| 男人色天堂| 亚洲综合一区二区三区| 免费看一级黄色片| 人人妻人人摸| 亚洲Av无码午夜国产精品色软件| 激情小说五月天| 狼友视频在线| 国产乱子伦一区二区三区视频| 一区二区三区四区精品视频| 三级av在线| 中文字幕高清视频| 二区三区免费视频| 欧美老妇性猛交| 日韩午夜无码| www.国产| 亚洲激情| 黄片网址大全| 亚洲黄片免费| 久久久久久麻豆| 在线免费观看网站| 操逼动漫| 九九成人免费视频| 欧美三级毛片| 午夜成人福利视频在线观看| 超碰天堂| 91丨九色丨熟女老版| 福利所导航| 亚洲一级免费视频| 国产做爱导航| 欧美成人无码一区二区三区| 亚洲无码在| 大色鬼在线天堂精品| 丰满熟妇| 2016av天堂网| 乱子伦毛片国产| 97福利视频| 好吊妞在线观看| H片在线免费观看| 91精品国产一区二区三区四区大 | 国产亚洲无码激情前后夹击| 嗯啊av| 99亚洲无码| 中文字幕+乱码+中文字幕电视剧| 成人片成人网久久蜜桃臀| 中文字幕少妇| 特级444WWW大胆高清| 能看毛片的网站| jjzz亚洲| 特级西西444www| 国产精品无码成人AV在线播放| 欧美96| 四色永久成人网站| 日韩AAA在线| 中日韩特黄A片免费视频| 激情婷婷| 喷水视频在线观看| 人人妻人人澡| www.97cao| 无码av在线播放| 欧美色图在线观看视频| AV在线直播| 亚洲熟妇在线观看| 日本午夜福利电影| 精品国产AV无码一区二区三区 | 久久久久三级| 51国产黑料吃瓜在线入口| 国产免费黄色| 2025中文字幕在线| 久久69| 日本无码视频在线观看毒| 自拍偷拍一区二区| 成人网站在线| 日本在线无码| 亚洲色图欧美| 中文字幕欧美激情| 婷婷欧美| 91综合视频在线播放| 婷婷精品视频| 99成人在线视频| 777久久久| 亚洲猛男操逼欧美国产视频| 狠狠操天天干| 亚洲精品偷拍| 黄色一级大片在线免费看国产| 成人精品久久久| 久操网在线视频| 精品视频91| 日逼网站国产| 91在线免费视频观看| 特级西西444WWW高清大视频| 亚洲性爱影院| 夜夜夜夜骑| 香蕉国产2023| 色久悠悠综合网| 亚洲无码专区在线| 国产一级婬乱A片| 91日韩在线| 国产尤物视频| av操操操| 欧美日韩精品在线| 亚洲无码av在线观看| 少妇搡BBBB搡BBB搡视频一级| 国产福利在线导航| 青青草免费公开视频| 黄色免费a级片一级片| 久久香蕉网站| 大香蕉综合| 婷婷综合亚洲| 久久久久久97电影院电影院无码 | 无码内射在线播放| 国产精品2025| 成人一级A片| 99视频精品全部免费看| 啪啪成人视频| 亚洲无码视频观看| 久操福利视频| 伊人影院在线看| 欧美日韩免费在线播放电影在线播放电影在线播放电影免费 | 国产aⅴ激情无码久久久无码 | 天天射综合| 中文字幕不卡无码| 91人人妻人人妻人人澡| 国产一区二区成人久久919色| 欧美一级黃色A片免费看蜜桃熟了| 91内射| 蜜桃视频成人app| 久草电影在线观看| 欧一美一伦一A片| 农村A片婬片AAA毛片| 久久午夜福利电影| 亚洲中文无码av| 影音先锋日韩资源| 婷婷福利导航| 日本久久婷婷| 双飞人妻13p| 日韩区在线| 激情淫荡少妇| 国产精品成人无码a无码| 狠狠干2024| 私人玩物』黑絲OL尤物| 91九色在线观看| 国产精品天天狠天天看| 国产97视频| 一区二区三区黄色| A片免费网址| 人人澡av| mm131亚洲国产精品久久| 人人澡人人澡人人澡| 成人电影久久久| 天堂视频在线观看亚洲美女| 国产乱码在线| 黄色大片免费在线观看| 国产农村乱婬片A片AAA图片| 草久美女| 91在线小视频| 操人视频在线观看| 3D动漫精品啪啪一区二区| 亚洲十八禁| 成人亚洲在线| 久久99精品国产.久久久久久| 欧美色一级| 8050网午夜| AV乱伦网站| 国产剧情一区二区| 佐山爱人妻无码蜜桃| 在线看片a| 91黄色电影| 99热这里精品| 欧美一区三区| 91丝袜在线| 白虎高清无码大尺度免费在线观看| 五月婷婷激情网| AV黄色网| 精国产品一区二区三区A片| 国产精品一二| 高清国产mv在线观看| 国产白丝精品91爽爽久久| 国产高清在线免费观看AV片| 狠狠干综合| 九九99精品| 五月婷婷综合在线| 五月天婷婷影院影院| 黄页网站视频| 亚洲黄色电影网站| 国产高清一区二区三区| 欧美又粗又大| 欧美操BB| 精品视频网| 中文字幕欧美激情| 日韩精品中文无码| 一级黄片在线| av影音先锋| 毛片黄片| 国产免费久久| 无码一区二区三区四| 国产日韩欧美在线播放| 在线色网站| 欧美操逼电影| 国产成人a亚洲精品www| 99re| 国产黄色片在线观看| av中文字幕网| 精品成人一区二区三区| 久久私拍| 欧美性猛交XXXX乱大交HD| 在线播放91灌醉迷J高跟美女| 久久性爱免费视频| 久久这里有精品视频| 久久丁香五月婷婷五月天激情视频 | 国产成人精品一区二区三区视频 | 最新免费毛片| 婷婷丁香五月激情一区综合网 | 在线观看免费人成视频| 久久免费成人| 免费视频| 91亚洲国产AⅤ精品一区二区| 久久久久久久久久成人永久免费视频 | 亚洲a电影| 日韩日韩日韩日韩日韩| 亚洲无码久久网| 天天夜夜狠狠| av无码导航| 日本黄在线播放| 香蕉av在线播放| 美国久久久| 久久五月天婷婷| 亚洲第一色网| 精品九九九九| 97国产超碰| 中文字幕一区二区6页| 中文字幕99| 国产91在线亚洲| 久久久久久久精| 日本高清无码视频| 麻豆亚洲AV成人无码久久精品 | 搡中国东北老女人视频| sese在线| 日韩在线中文字幕视频| 天天日天天撸| 欧美精品日韩在线观看| 日本天堂在线| www亚洲视频| 日韩免费不卡| 丁香激情五月少妇| 97久久一区二区| 亚洲欧美在线免费观看| 翔田千里AV| 西西444WWW无码精品| www四虎| 美女日屄| 青春草在线观看视频| 俺去啦在线| 黄色一级录像| 91网址| 在线看黄片| 久久人妻中文字幕| 91久久国产综合久久91| 日韩一级A片| 中文字幕精品视频| 亚洲AAA| 亚洲综合一二三区| 你懂的久久| 五月天视频网| 黄色av网站免费| 高清无码免费观看视频| 日韩第1页| 亚洲中文久久| 超碰人妻97| 亚洲精品中文字幕在线观看| 亚洲一级性爱| 无码在线不卡| 黄片在线免费观看视频| 五月婷婷色色| av无码在线观看| 亚洲综合免费观看高清完整版在线| 亚洲影视中文字幕| 大香蕉一级红色片青青河边草| 免费成人在线网站| 国产日韩欧美久久| ww亚洲ww| 国产丝袜在线视频| 日韩午夜福利| 免费在线观看黄色| 搡BBB搡BBBB搡BBBB-百度| 日韩AV无码免费| 三级理论网站| 国产成人无码一区二区在线观看| 99热在线观看免费精品| 国产专区在线| 大香蕉福利视频| 无码av亚洲一区二区毛片公司| 欧美三级在线观看视频| 在线播放亚洲| 自拍偷拍综合网| 97人妻一区二区三区| 狠狠的日| 亚洲激情成人| 99热激情在线| 人妻丝袜无码视频专区| 五月婷婷丁香综合| 亚洲无码久久久| 亚洲一级二级片| 亚洲中文中出| 蜜臀网在线| 黄色片A片| 色妞视频精品一区| 久久不雅视频| www.a片| 99色综合| 久久国产精品视频| 视频一区在线播放| 亚洲无码色色| 国产乱妇乱子伦视频免费观看让女人 | 蜜桃成人无码区免费视频网站| 亚洲高清无码在线视频| 国产18| 3DAV一区二区三区动漫| BBw日本熟妇BBwHD| 国产一卡二卡在线观看| 91丨露脸丨熟女| 岛国av片| 乱伦激情| 日逼天堂| 丝袜制服中文字幕无码专区| 狠狠做深爱婷婷久久综合一区| 久久黄色视频免费观看| 国产在线视频你懂的| 一区二区操逼| 国产三级一区二区| 中文字幕日本在线| 伊人在线| 色婷婷激情在线| 日韩高清无码三级片| 亚洲无码AV电影| 日韩无码高清网站| 欧美BBWBBWBBWBBWBBwBBW| 四川乱子伦95视频国产| 奇米成人片| 五月婷婷综合激情| 国产一级婬片A片AAA樱花| 免费黄色小视频| 欧美色女人| www.91国产| 日本免费高清视频在线观看一区 | 欧美性爱天天操| 亚洲色诱| 大色鬼在线天堂精品| 欧美高清无码视频| 超碰欧美| 91精品视频在线免费观看| 国产精品美女毛片j酒店| 日本黄色免费看| 摸BBB槡BBBB搡BBB,,,,,| 日韩AV一区二区三区四区| 亚洲无码福利视频| 在线看黄网| 亚洲无码AV片| 口爆在线| av免费观看网址| 影音先锋国产| 91精品国产闺蜜国产在线闺蜜 | 在线免费观看成人网站| 天天色天天日天天干| AV三级无码| 最新中文字幕av| 91视频播放| 国产尤物在线观看| 国产一级婬片A片AAA樱花| 美女视频一区二区三区| 久久久精品网站| 成人AV天堂| 日韩av综合| 精品一区二区三区免费毛片| 波多野结衣成人网站| 围内精品久久久久久久久久‘变脸| 激情五月婷婷| 国产激情无码免费| 亚洲有码在线观看| 日本一区二区三区四区在线观看| 巜痴漢電車~凌脔版2| 99热精品免费在线观看| 成人网站欧美| 欧美日韩一| 五月天婷婷黄色| 99久久综合| 四虎影成人精品A片| 国产免费AV片| 操鸡视频在线观看| 中文字幕亚洲在线观看| 久久久精品淫秽色情| 打炮影院| 亚洲福利社| 天天艹夜夜艹| 免看一级a一片| 久久久无码精品亚洲| 国产又粗又大又黄视频| 一区二区三区成人电影| 久久天堂av| 国产黄色精品视频| 91国视频| 无码群交| 99热官方网站| 一级生活片| 韩国无码高清视频| 亚洲天天操| 精品无码视频在线| 一区二区三区无码视频| 特一级黄色视频| 躁BBB躁BBB躁BBBBBB日| 国产免费av片| 成人精品网| 高清无码在线不卡| 夜夜撸夜夜| 操人人| 四虎福利| 五月丁香久久| 一道本无码在线播放| 欧美爱| 69视频国产| 蜜臀无码在线| 国产伦精品一区二区三区妓女| 91AV在线免费观看| 躁BBB躁BBB躁BBBBBB日| 婷婷在线电影| 岛国av片| 久草中文视频| 国内精品久久久久久久久久变脸| 欧美一区二区三区在线| www.91com| 亚洲不卡免费视频| 成人午夜福利电影| 91久久无码一区人妻A片蜜桃| 北条麻妃在线无码| 欧美a区| 亚洲欧美在线一区| 九九九精品视频| 中文字幕第72页| 熊猫视频91| 69国产精品| 99高清国产| 苍井空亚洲精品AA片在线播放| 亚洲AV无码国产综合专区| 国产精品久久久久久最猛| 中文字幕av无码| 国产在线高清| 俩小伙3p老熟女露脸| 91精品网站| 91亚洲精品国偷拍自产在线观看| 一本一道vs波多野结衣| 国产精品午夜成人免费| 免费视频在线观看一区| 亚洲无码高清在线观看视频| 无码狠狠躁久久久久久久91 | 日韩欧美国产成人| 久久影院av| 久久精品在线观看| 黄片免费播放| 精品国精品自拍自在线| 狠狠久久| 国精产品一品二品国精| 亚洲天堂在线视频播放| 性视频人人| 亚洲AV无码国产综合专区| 欧美男女交配视频| 久久三级| 91精品久久久久久粉嫩| 性欧美欧美巨大69| 国产三级国产三级国产普通话| 三级片无码在线播放| 国产成人大香蕉| 中文字幕天天在线| 波多野结衣不卡| 婷婷五月综合在线| 欧美成人网站免费在线观看| 九九草在线视频| 久久成人片| 俺也去大香蕉| 久久婷婷五月丁香| 精品一区二区三区毛片| 热99| 伊人九九热| 久久久久亚洲AV无码网影音先锋| 国产久久久久| 91成人一区| 伊香蕉大综综综合| 成人五月天黄色电影| 色婷婷视频在线观看| 日韩a电影| 亚洲第一视频在线观看| 三级黄,色| 国产一区二区免费看| 免费看黄色大片| 亚洲中文字幕网站| 怡春院视频| 午夜成人精品一区二区三区| 欧美亚洲综合手机在线| 久久99精品久久久久久| 伊大香蕉| 日本在线视频不卡| 国产精品18在线| 青青伊人网| 无码人妻精品一区二区三千菊电影| 在线观看免费视频黄| 久草网在线观看| 久久er视频| 在线观看视频一区| 一区二区三区在线免费观看| 国产三级三级三级| 色婷婷激情综合网| 午夜免费播放观看在线视频| 影音先锋成人电影| 亚洲一卡二卡| 九九九成人网| 黄色视频小说| 国产一级a毛一级a毛视频在线网站)| 又大又长又粗91| 欧美性视频网站| 日韩免费高清| 日日精品| www.51av| 成人福利网站| 五月丁香花| 丁香五月天AV| 色天使AV| 婷婷丁香五月亚洲| 大香蕉在线视频观看| 日韩精品一区二区三区四在线播放| 国产激情av| 无码高潮| 免费的操逼视频| 呦小BBBB小小BBBB| 成人亚洲AV| 夜夜嗨AⅤ一区二区三区| 蜜芽成人在线视频| 国产免看一级a一片成人aⅴ| 健身房被教练3p喷水了| 蜜臀久久99精品久久久巴士| 国产黄色自拍视频| 国产爱搞| 久久久久久AV| 久久93| 午夜无码av| 国产性爱精品| 影音先锋在线视频| 国产—a毛—a毛A免费看图| 日韩二三区| 日韩视频中文字幕在线| 九色丨蝌蚪丨老版熟女| 欧美插插| 影音先锋三区| 一区二区三区福利| 天天日天天草天天干| 少妇人妻在线| 亚洲热在线视频| 久久亚洲AV成人无码国产野外| 欧美在线视频一区二区| 国产秘精品区二区三区日本| 免费黄色av网址| 蜜臀久久精品久久久久| 国产污视频在线观看| 国产无码观看| 99黄网| 99er在线| 成人久久久久一级大黄毛片中国| 国产毛片毛片毛片毛片毛片| 激情综合久久| 尤物精品| 亚洲无码视频网站| www.黄色在线观看| 国产女人18水真多18精品一级做| 激情深爱五月| 国产在线视频一区| 日韩在线精品| 陈冠希和张柏芝mv|