百萬(wàn)級(jí)數(shù)據(jù)批量讀寫(xiě)入MySQL
Spark SQL讀取MySQL的方式
Spark SQL還包括一個(gè)可以使用JDBC從其他數(shù)據(jù)庫(kù)讀取數(shù)據(jù)的數(shù)據(jù)源。與使用JdbcRDD相比,應(yīng)優(yōu)先使用此功能。這是因?yàn)榻Y(jié)果作為DataFrame返回,它們可以在Spark SQL中輕松處理或與其他數(shù)據(jù)源連接。JDBC數(shù)據(jù)源也更易于使用Java或Python,因?yàn)樗恍枰脩籼峁〤lassTag。
可以使用Data Sources API將遠(yuǎn)程數(shù)據(jù)庫(kù)中的表加載為DataFrame或Spark SQL臨時(shí)視圖。用戶可以在數(shù)據(jù)源選項(xiàng)中指定JDBC連接屬性。user和password通常作為用于登錄數(shù)據(jù)源的連接屬性。除連接屬性外,Spark還支持以下不區(qū)分大小寫(xiě)的選項(xiàng):
| 屬性名稱 | 解釋 |
|---|---|
url | 要連接的JDBC URL |
dbtable | 讀取或?qū)懭氲腏DBC表 |
query | 指定查詢語(yǔ)句 |
driver | 用于連接到該URL的JDBC驅(qū)動(dòng)類(lèi)名 |
partitionColumn, lowerBound, upperBound | 如果指定了這些選項(xiàng),則必須全部指定。另外, numPartitions必須指定 |
numPartitions | 表讀寫(xiě)中可用于并行處理的最大分區(qū)數(shù)。這也確定了并發(fā)JDBC連接的最大數(shù)量。如果要寫(xiě)入的分區(qū)數(shù)超過(guò)此限制,我們可以通過(guò)coalesce(numPartitions)在寫(xiě)入之前進(jìn)行調(diào)用將其降低到此限制 |
queryTimeout | 默認(rèn)為0,查詢超時(shí)時(shí)間 |
fetchsize | JDBC的獲取大小,它確定每次要獲取多少行。這可以幫助提高JDBC驅(qū)動(dòng)程序的性能 |
batchsize | 默認(rèn)為1000,JDBC批處理大小,這可以幫助提高JDBC驅(qū)動(dòng)程序的性能。 |
isolationLevel | 事務(wù)隔離級(jí)別,適用于當(dāng)前連接。它可以是一個(gè)NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對(duì)應(yīng)于由JDBC的連接對(duì)象定義,缺省值為標(biāo)準(zhǔn)事務(wù)隔離級(jí)別READ_UNCOMMITTED。此選項(xiàng)僅適用于寫(xiě)作。 |
sessionInitStatement | 在向遠(yuǎn)程數(shù)據(jù)庫(kù)打開(kāi)每個(gè)數(shù)據(jù)庫(kù)會(huì)話之后,在開(kāi)始讀取數(shù)據(jù)之前,此選項(xiàng)將執(zhí)行自定義SQL語(yǔ)句,使用它來(lái)實(shí)現(xiàn)會(huì)話初始化代碼。 |
truncate | 這是與JDBC writer相關(guān)的選項(xiàng)。當(dāng)SaveMode.Overwrite啟用時(shí),就會(huì)清空目標(biāo)表的內(nèi)容,而不是刪除和重建其現(xiàn)有的表。默認(rèn)為false |
pushDownPredicate | 用于啟用或禁用謂詞下推到JDBC數(shù)據(jù)源的選項(xiàng)。默認(rèn)值為true,在這種情況下,Spark將盡可能將過(guò)濾器下推到JDBC數(shù)據(jù)源。 |
源碼
SparkSession
/**
???*?Returns?a?[[DataFrameReader]]?that?can?be?used?to?read?non-streaming?data?in?as?a
???*?`DataFrame`.
???*?{{{
???*???sparkSession.read.parquet("/path/to/file.parquet")
???*???sparkSession.read.schema(schema).json("/path/to/file.json")
???*?}}}
???*
???*?@since?2.0.0
???*/
??def?read:?DataFrameReader?=?new?DataFrameReader(self)
DataFrameReader
??//?...省略代碼...
??/**
???*所有的數(shù)據(jù)由RDD的一個(gè)分區(qū)處理,如果你這個(gè)表很大,很可能會(huì)出現(xiàn)OOM
???*可以使用DataFrameDF.rdd.partitions.size方法查看
???*/
??def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrame?=?{
????assertNoSpecifiedSchema("jdbc")
????this.extraOptions?++=?properties.asScala
????this.extraOptions?+=?(JDBCOptions.JDBC_URL?->?url,?JDBCOptions.JDBC_TABLE_NAME?->?table)
????format("jdbc").load()
??}
/**
???*?@param?url?數(shù)據(jù)庫(kù)url
???*?@param?table?表名
???*?@param?columnName?分區(qū)字段名
???*?@param?lowerBound??`columnName`的最小值,用于分區(qū)步長(zhǎng)
???*?@param?upperBound??`columnName`的最大值,用于分區(qū)步長(zhǎng).
???*?@param?numPartitions?分區(qū)數(shù)量?
???*?@param?connectionProperties?其他參數(shù)
???*?@since?1.4.0
???*/
??def?jdbc(
??????url:?String,
??????table:?String,
??????columnName:?String,
??????lowerBound:?Long,
??????upperBound:?Long,
??????numPartitions:?Int,
??????connectionProperties:?Properties):?DataFrame?=?{
????this.extraOptions?++=?Map(
??????JDBCOptions.JDBC_PARTITION_COLUMN?->?columnName,
??????JDBCOptions.JDBC_LOWER_BOUND?->?lowerBound.toString,
??????JDBCOptions.JDBC_UPPER_BOUND?->?upperBound.toString,
??????JDBCOptions.JDBC_NUM_PARTITIONS?->?numPartitions.toString)
????jdbc(url,?table,?connectionProperties)
??}
??/**
???*?@param?predicates?每個(gè)分區(qū)的where條件
???*?比如:"id <= 1000", "score > 1000 and score <= 2000"
???*?將會(huì)分成兩個(gè)分區(qū)
???*?@since?1.4.0
???*/
??def?jdbc(
??????url:?String,
??????table:?String,
??????predicates:?Array[String],
??????connectionProperties:?Properties):?DataFrame?=?{
????assertNoSpecifiedSchema("jdbc")
????val?params?=?extraOptions.toMap?++?connectionProperties.asScala.toMap
????val?options?=?new?JDBCOptions(url,?table,?params)
????val?parts:?Array[Partition]?=?predicates.zipWithIndex.map?{?case?(part,?i)?=>
??????JDBCPartition(part,?i)?:?Partition
????}
????val?relation?=?JDBCRelation(parts,?options)(sparkSession)
????sparkSession.baseRelationToDataFrame(relation)
??}
示例
?private?def?runJdbcDatasetExample(spark:?SparkSession):?Unit?=?{
????
????//?從JDBC?source加載數(shù)據(jù)(load)
????val?jdbcDF?=?spark.read
??????.format("jdbc")
??????.option("url",?"jdbc:mysql://127.0.0.1:3306/test")
??????.option("dbtable",?"mytable")
??????.option("user",?"root")
??????.option("password",?"root")
??????.load()
????val?connectionProperties?=?new?Properties()
????connectionProperties.put("user",?"root")
????connectionProperties.put("password",?"root")
????val?jdbcDF2?=?spark.read
??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)
????//?指定讀取schema的數(shù)據(jù)類(lèi)型
????connectionProperties.put("customSchema",?"id?DECIMAL(38,?0),?name?STRING")
????val?jdbcDF3?=?spark.read
??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)
??}
值得注意的是,上面的方式如果不指定分區(qū)的話,Spark默認(rèn)會(huì)使用一個(gè)分區(qū)讀取數(shù)據(jù),這樣在數(shù)據(jù)量特別大的情況下,會(huì)出現(xiàn)OOM。在讀取數(shù)據(jù)之后,調(diào)用DataFrameDF.rdd.partitions.size方法可以查看分區(qū)數(shù)。
Spark SQL批量寫(xiě)入MySQL
代碼示例如下:
object?BatchInsertMySQL?{
??case?class?Person(name:?String,?age:?Int)
??def?main(args:?Array[String]):?Unit?=?{
????//?創(chuàng)建sparkSession對(duì)象
????val?conf?=?new?SparkConf()
??????.setAppName("BatchInsertMySQL")
????val?spark:?SparkSession?=??SparkSession.builder()
??????.config(conf)
??????.getOrCreate()
????import?spark.implicits._
????//?MySQL連接參數(shù)
????val?url?=?JDBCUtils.url
????val?user?=?JDBCUtils.user
????val?pwd?=?JDBCUtils.password
????//?創(chuàng)建Properties對(duì)象,設(shè)置連接mysql的用戶名和密碼
????val?properties:?Properties?=?new?Properties()
????properties.setProperty("user",?user)?//?用戶名
????properties.setProperty("password",?pwd)?//?密碼
????properties.setProperty("driver",?"com.mysql.jdbc.Driver")
????properties.setProperty("numPartitions","10")
????//?讀取mysql中的表數(shù)據(jù)
????val?testDF:?DataFrame?=?spark.read.jdbc(url,?"test",?properties)
?????println("testDF的分區(qū)數(shù):??"?+?testDF.rdd.partitions.size)
???testDF.createOrReplaceTempView("test")
???testDF.persist(StorageLevel.MEMORY_AND_DISK)
???testDF.printSchema()
????val?result?=
??????s"""--?SQL代碼
???????????????""".stripMargin
????val?resultBatch?=?spark.sql(result).as[Person]
????println("resultBatch的分區(qū)數(shù):?"?+?resultBatch.rdd.partitions.size)
????//?批量寫(xiě)入MySQL
????//?此處最好對(duì)處理的結(jié)果進(jìn)行一次重分區(qū)
????//?由于數(shù)據(jù)量特別大,會(huì)造成每個(gè)分區(qū)數(shù)據(jù)特別多
????resultBatch.repartition(500).foreachPartition(record?=>?{
??????val?list?=?new?ListBuffer[Person]
??????record.foreach(person?=>?{
????????val?name?=?Person.name
????????val?age?=?Person.age
????????list.append(Person(name,age))
??????})
??????upsertDateMatch(list)?//執(zhí)行批量插入數(shù)據(jù)
????})
????//?批量插入MySQL的方法
????def?upsertPerson(list:?ListBuffer[Person]):?Unit?=?{
??????var?connect:?Connection?=?null
??????var?pstmt:?PreparedStatement?=?null
??????try?{
????????connect?=?JDBCUtils.getConnection()
????????//?禁用自動(dòng)提交
????????connect.setAutoCommit(false)
????????val?sql?=?"REPLACE?INTO?`person`(name,?age)"?+
??????????"?VALUES(?,??)"
????????pstmt?=?connect.prepareStatement(sql)
????????var?batchIndex?=?0
????????for?(person?<-?list)?{
??????????pstmt.setString(1,?person.name)
??????????pstmt.setString(2,?person.age)
??????????//?加入批次
??????????pstmt.addBatch()
??????????batchIndex?+=1
??????????//?控制提交的數(shù)量,
??????????// MySQL的批量寫(xiě)入盡量限制提交批次的數(shù)據(jù)量,否則會(huì)把MySQL寫(xiě)掛!?。?/span>
??????????if(batchIndex?%?1000?==?0?&&?batchIndex?!=0){
????????????pstmt.executeBatch()
????????????pstmt.clearBatch()
??????????}
????????}
????????//?提交批次
????????pstmt.executeBatch()
????????connect.commit()
??????}?catch?{
????????case?e:?Exception?=>
??????????e.printStackTrace()
??????}?finally?{
????????JDBCUtils.closeConnection(connect,?pstmt)
??????}
????}
????spark.close()
??}
}
JDBC連接工具類(lèi):
object?JDBCUtils?{
??val?user?=?"root"
??val?password?=?"root"
??val?url?=?"jdbc:mysql://localhost:3306/mydb"
??Class.forName("com.mysql.jdbc.Driver")
??//?獲取連接
??def?getConnection()?=?{
????DriverManager.getConnection(url,user,password)
??}
//?釋放連接
??def?closeConnection(connection:?Connection,?pstmt:?PreparedStatement):?Unit?=?{
????try?{
??????if?(pstmt?!=?null)?{
????????pstmt.close()
??????}
????}?catch?{
??????case?e:?Exception?=>?e.printStackTrace()
????}?finally?{
??????if?(connection?!=?null)?{
????????connection.close()
??????}
????}
??}
}
總結(jié)
Spark寫(xiě)入大量數(shù)據(jù)到MySQL時(shí),在寫(xiě)入之前盡量對(duì)寫(xiě)入的DF進(jìn)行重分區(qū)處理,避免分區(qū)內(nèi)數(shù)據(jù)過(guò)多。在寫(xiě)入時(shí),要注意使用foreachPartition來(lái)進(jìn)行寫(xiě)入,這樣可以為每一個(gè)分區(qū)獲取一個(gè)連接,在分區(qū)內(nèi)部設(shè)定批次提交,提交的批次不易過(guò)大,以免將數(shù)據(jù)庫(kù)寫(xiě)掛。
