1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        百萬(wàn)級(jí)數(shù)據(jù)批量讀寫(xiě)入MySQL

        共 2766字,需瀏覽 6分鐘

         ·

        2020-11-04 09:11

        Spark SQL讀取MySQL的方式

        Spark SQL還包括一個(gè)可以使用JDBC從其他數(shù)據(jù)庫(kù)讀取數(shù)據(jù)的數(shù)據(jù)源。與使用JdbcRDD相比,應(yīng)優(yōu)先使用此功能。這是因?yàn)榻Y(jié)果作為DataFrame返回,它們可以在Spark SQL中輕松處理或與其他數(shù)據(jù)源連接。JDBC數(shù)據(jù)源也更易于使用Java或Python,因?yàn)樗恍枰脩籼峁〤lassTag。

        可以使用Data Sources API將遠(yuǎn)程數(shù)據(jù)庫(kù)中的表加載為DataFrame或Spark SQL臨時(shí)視圖。用戶可以在數(shù)據(jù)源選項(xiàng)中指定JDBC連接屬性。user和password通常作為用于登錄數(shù)據(jù)源的連接屬性。除連接屬性外,Spark還支持以下不區(qū)分大小寫(xiě)的選項(xiàng):

        屬性名稱解釋
        url要連接的JDBC URL
        dbtable讀取或?qū)懭氲腏DBC表
        query指定查詢語(yǔ)句
        driver用于連接到該URL的JDBC驅(qū)動(dòng)類(lèi)名
        partitionColumn, lowerBound, upperBound如果指定了這些選項(xiàng),則必須全部指定。另外, numPartitions必須指定
        numPartitions表讀寫(xiě)中可用于并行處理的最大分區(qū)數(shù)。這也確定了并發(fā)JDBC連接的最大數(shù)量。如果要寫(xiě)入的分區(qū)數(shù)超過(guò)此限制,我們可以通過(guò)coalesce(numPartitions)在寫(xiě)入之前進(jìn)行調(diào)用將其降低到此限制
        queryTimeout默認(rèn)為0,查詢超時(shí)時(shí)間
        fetchsizeJDBC的獲取大小,它確定每次要獲取多少行。這可以幫助提高JDBC驅(qū)動(dòng)程序的性能
        batchsize默認(rèn)為1000,JDBC批處理大小,這可以幫助提高JDBC驅(qū)動(dòng)程序的性能。
        isolationLevel事務(wù)隔離級(jí)別,適用于當(dāng)前連接。它可以是一個(gè)NONE,READ_COMMITTEDREAD_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,對(duì)應(yīng)于由JDBC的連接對(duì)象定義,缺省值為標(biāo)準(zhǔn)事務(wù)隔離級(jí)別READ_UNCOMMITTED。此選項(xiàng)僅適用于寫(xiě)作。
        sessionInitStatement在向遠(yuǎn)程數(shù)據(jù)庫(kù)打開(kāi)每個(gè)數(shù)據(jù)庫(kù)會(huì)話之后,在開(kāi)始讀取數(shù)據(jù)之前,此選項(xiàng)將執(zhí)行自定義SQL語(yǔ)句,使用它來(lái)實(shí)現(xiàn)會(huì)話初始化代碼。
        truncate這是與JDBC writer相關(guān)的選項(xiàng)。當(dāng)SaveMode.Overwrite啟用時(shí),就會(huì)清空目標(biāo)表的內(nèi)容,而不是刪除和重建其現(xiàn)有的表。默認(rèn)為false
        pushDownPredicate用于啟用或禁用謂詞下推到JDBC數(shù)據(jù)源的選項(xiàng)。默認(rèn)值為true,在這種情況下,Spark將盡可能將過(guò)濾器下推到JDBC數(shù)據(jù)源。

        源碼

        • SparkSession
        /**
        ???*?Returns?a?[[DataFrameReader]]?that?can?be?used?to?read?non-streaming?data?in?as?a
        ???*?`DataFrame`.
        ???*?{{{
        ???*???sparkSession.read.parquet("/path/to/file.parquet")
        ???*???sparkSession.read.schema(schema).json("/path/to/file.json")
        ???*?}}}
        ???*
        ???*?@since?2.0.0
        ???*/

        ??def?read:?DataFrameReader?=?new?DataFrameReader(self)
        • DataFrameReader
        ??//?...省略代碼...
        ??/**
        ???*所有的數(shù)據(jù)由RDD的一個(gè)分區(qū)處理,如果你這個(gè)表很大,很可能會(huì)出現(xiàn)OOM
        ???*可以使用DataFrameDF.rdd.partitions.size方法查看
        ???*/

        ??def?jdbc(url:?String,?table:?String,?properties:?Properties):?DataFrame?=?{
        ????assertNoSpecifiedSchema("jdbc")
        ????this.extraOptions?++=?properties.asScala
        ????this.extraOptions?+=?(JDBCOptions.JDBC_URL?->?url,?JDBCOptions.JDBC_TABLE_NAME?->?table)
        ????format("jdbc").load()
        ??}
        /**
        ???*?@param?url?數(shù)據(jù)庫(kù)url
        ???*?@param?table?表名
        ???*?@param?columnName?分區(qū)字段名
        ???*?@param?lowerBound??`columnName`的最小值,用于分區(qū)步長(zhǎng)
        ???*?@param?upperBound??`columnName`的最大值,用于分區(qū)步長(zhǎng).
        ???*?@param?numPartitions?分區(qū)數(shù)量?
        ???*?@param?connectionProperties?其他參數(shù)
        ???*?@since?1.4.0
        ???*/

        ??def?jdbc(
        ??????url:?String,
        ??????table:?String,
        ??????columnName:?String,
        ??????lowerBound:?Long,
        ??????upperBound:?Long,
        ??????numPartitions:?Int,
        ??????connectionProperties:?Properties):?DataFrame?=?{
        ????this.extraOptions?++=?Map(
        ??????JDBCOptions.JDBC_PARTITION_COLUMN?->?columnName,
        ??????JDBCOptions.JDBC_LOWER_BOUND?->?lowerBound.toString,
        ??????JDBCOptions.JDBC_UPPER_BOUND?->?upperBound.toString,
        ??????JDBCOptions.JDBC_NUM_PARTITIONS?->?numPartitions.toString)
        ????jdbc(url,?table,?connectionProperties)
        ??}

        ??/**
        ???*?@param?predicates?每個(gè)分區(qū)的where條件
        ???*?比如:"id <= 1000", "score > 1000 and score <= 2000"
        ???*?將會(huì)分成兩個(gè)分區(qū)
        ???*?@since?1.4.0
        ???*/

        ??def?jdbc(
        ??????url:?String,
        ??????table:?String,
        ??????predicates:?Array[String],
        ??????connectionProperties:?Properties):?DataFrame?=?{
        ????assertNoSpecifiedSchema("jdbc")
        ????val?params?=?extraOptions.toMap?++?connectionProperties.asScala.toMap
        ????val?options?=?new?JDBCOptions(url,?table,?params)
        ????val?parts:?Array[Partition]?=?predicates.zipWithIndex.map?{?case?(part,?i)?=>
        ??????JDBCPartition(part,?i)?:?Partition
        ????}
        ????val?relation?=?JDBCRelation(parts,?options)(sparkSession)
        ????sparkSession.baseRelationToDataFrame(relation)
        ??}

        示例

        ?private?def?runJdbcDatasetExample(spark:?SparkSession):?Unit?=?{
        ????
        ????//?從JDBC?source加載數(shù)據(jù)(load)
        ????val?jdbcDF?=?spark.read
        ??????.format("jdbc")
        ??????.option("url",?"jdbc:mysql://127.0.0.1:3306/test")
        ??????.option("dbtable",?"mytable")
        ??????.option("user",?"root")
        ??????.option("password",?"root")
        ??????.load()

        ????val?connectionProperties?=?new?Properties()
        ????connectionProperties.put("user",?"root")
        ????connectionProperties.put("password",?"root")
        ????val?jdbcDF2?=?spark.read
        ??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)
        ????//?指定讀取schema的數(shù)據(jù)類(lèi)型
        ????connectionProperties.put("customSchema",?"id?DECIMAL(38,?0),?name?STRING")
        ????val?jdbcDF3?=?spark.read
        ??????.jdbc("jdbc:mysql://127.0.0.1:3306/test",?"mytable",?connectionProperties)

        ??}

        值得注意的是,上面的方式如果不指定分區(qū)的話,Spark默認(rèn)會(huì)使用一個(gè)分區(qū)讀取數(shù)據(jù),這樣在數(shù)據(jù)量特別大的情況下,會(huì)出現(xiàn)OOM。在讀取數(shù)據(jù)之后,調(diào)用DataFrameDF.rdd.partitions.size方法可以查看分區(qū)數(shù)。

        Spark SQL批量寫(xiě)入MySQL

        代碼示例如下:

        object?BatchInsertMySQL?{
        ??case?class?Person(name:?String,?age:?Int)
        ??def?main(args:?Array[String]):?Unit?=?{

        ????//?創(chuàng)建sparkSession對(duì)象
        ????val?conf?=?new?SparkConf()
        ??????.setAppName("BatchInsertMySQL")
        ????val?spark:?SparkSession?=??SparkSession.builder()
        ??????.config(conf)
        ??????.getOrCreate()
        ????import?spark.implicits._
        ????//?MySQL連接參數(shù)
        ????val?url?=?JDBCUtils.url
        ????val?user?=?JDBCUtils.user
        ????val?pwd?=?JDBCUtils.password

        ????//?創(chuàng)建Properties對(duì)象,設(shè)置連接mysql的用戶名和密碼
        ????val?properties:?Properties?=?new?Properties()

        ????properties.setProperty("user",?user)?//?用戶名
        ????properties.setProperty("password",?pwd)?//?密碼
        ????properties.setProperty("driver",?"com.mysql.jdbc.Driver")
        ????properties.setProperty("numPartitions","10")

        ????//?讀取mysql中的表數(shù)據(jù)
        ????val?testDF:?DataFrame?=?spark.read.jdbc(url,?"test",?properties)
        ?????println("testDF的分區(qū)數(shù):??"?+?testDF.rdd.partitions.size)
        ???testDF.createOrReplaceTempView("test")
        ???testDF.persist(StorageLevel.MEMORY_AND_DISK)
        ???testDF.printSchema()

        ????val?result?=
        ??????s"""--?SQL代碼
        ???????????????"
        "".stripMargin

        ????val?resultBatch?=?spark.sql(result).as[Person]
        ????println("resultBatch的分區(qū)數(shù):?"?+?resultBatch.rdd.partitions.size)

        ????//?批量寫(xiě)入MySQL
        ????//?此處最好對(duì)處理的結(jié)果進(jìn)行一次重分區(qū)
        ????//?由于數(shù)據(jù)量特別大,會(huì)造成每個(gè)分區(qū)數(shù)據(jù)特別多
        ????resultBatch.repartition(500).foreachPartition(record?=>?{

        ??????val?list?=?new?ListBuffer[Person]
        ??????record.foreach(person?=>?{
        ????????val?name?=?Person.name
        ????????val?age?=?Person.age
        ????????list.append(Person(name,age))
        ??????})
        ??????upsertDateMatch(list)?//執(zhí)行批量插入數(shù)據(jù)
        ????})
        ????//?批量插入MySQL的方法
        ????def?upsertPerson(list:?ListBuffer[Person]):?Unit?=?{

        ??????var?connect:?Connection?=?null
        ??????var?pstmt:?PreparedStatement?=?null

        ??????try?{
        ????????connect?=?JDBCUtils.getConnection()
        ????????//?禁用自動(dòng)提交
        ????????connect.setAutoCommit(false)

        ????????val?sql?=?"REPLACE?INTO?`person`(name,?age)"?+
        ??????????"?VALUES(?,??)"

        ????????pstmt?=?connect.prepareStatement(sql)

        ????????var?batchIndex?=?0
        ????????for?(person?<-?list)?{
        ??????????pstmt.setString(1,?person.name)
        ??????????pstmt.setString(2,?person.age)
        ??????????//?加入批次
        ??????????pstmt.addBatch()
        ??????????batchIndex?+=1
        ??????????//?控制提交的數(shù)量,
        ??????????// MySQL的批量寫(xiě)入盡量限制提交批次的數(shù)據(jù)量,否則會(huì)把MySQL寫(xiě)掛!?。?/span>
        ??????????if(batchIndex?%?1000?==?0?&&?batchIndex?!=0){
        ????????????pstmt.executeBatch()
        ????????????pstmt.clearBatch()
        ??????????}

        ????????}
        ????????//?提交批次
        ????????pstmt.executeBatch()
        ????????connect.commit()
        ??????}?catch?{
        ????????case?e:?Exception?=>
        ??????????e.printStackTrace()
        ??????}?finally?{
        ????????JDBCUtils.closeConnection(connect,?pstmt)
        ??????}
        ????}

        ????spark.close()
        ??}
        }

        JDBC連接工具類(lèi):

        object?JDBCUtils?{
        ??val?user?=?"root"
        ??val?password?=?"root"
        ??val?url?=?"jdbc:mysql://localhost:3306/mydb"
        ??Class.forName("com.mysql.jdbc.Driver")
        ??//?獲取連接
        ??def?getConnection()?=?{
        ????DriverManager.getConnection(url,user,password)
        ??}
        //?釋放連接
        ??def?closeConnection(connection:?Connection,?pstmt:?PreparedStatement):?Unit?=?{
        ????try?{
        ??????if?(pstmt?!=?null)?{
        ????????pstmt.close()
        ??????}
        ????}?catch?{
        ??????case?e:?Exception?=>?e.printStackTrace()
        ????}?finally?{
        ??????if?(connection?!=?null)?{
        ????????connection.close()
        ??????}
        ????}
        ??}
        }

        總結(jié)

        Spark寫(xiě)入大量數(shù)據(jù)到MySQL時(shí),在寫(xiě)入之前盡量對(duì)寫(xiě)入的DF進(jìn)行重分區(qū)處理,避免分區(qū)內(nèi)數(shù)據(jù)過(guò)多。在寫(xiě)入時(shí),要注意使用foreachPartition來(lái)進(jìn)行寫(xiě)入,這樣可以為每一個(gè)分區(qū)獲取一個(gè)連接,在分區(qū)內(nèi)部設(shè)定批次提交,提交的批次不易過(guò)大,以免將數(shù)據(jù)庫(kù)寫(xiě)掛。

        瀏覽 67
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            蜜桃视频91 | 久久福利国产 | 18 爽爽精品 国产 | 欧美成人性爱电影在线观看 | 国产欧洲精品亚洲午夜拍精品 | 国产91视频 | 男生操女生的动漫 | 中国老熟女重囗味HDXX | 变态撕了美女裙子摸下身 | 亚洲精品粉嫩小泬18p图片 |