1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        【機(jī)器學(xué)習(xí)】在機(jī)器學(xué)習(xí)中處理大量數(shù)據(jù)!

        共 12741字,需瀏覽 26分鐘

         ·

        2021-03-25 11:54

        作者:牧小熊,華中農(nóng)業(yè)大學(xué),Datawhale成員

        知乎 | https://zhuanlan.zhihu.com/p/357361005

        之前系統(tǒng)梳理過大數(shù)據(jù)概念和基礎(chǔ)知識(可點(diǎn)擊),本文基于PySpark在機(jī)器學(xué)習(xí)實(shí)踐中的用法,希望對大數(shù)據(jù)學(xué)習(xí)的同學(xué)起到拋磚引玉的作用。(當(dāng)數(shù)據(jù)集較小時,用Pandas足夠,當(dāng)數(shù)據(jù)量較大時,就需要利用分布式數(shù)據(jù)處理工具,Spark很適用)

        1.PySpark簡介

        Apache Spark是一個閃電般快速的實(shí)時處理框架。它進(jìn)行內(nèi)存計(jì)算以實(shí)時分析數(shù)據(jù)。由于Apache Hadoop MapReduce僅執(zhí)行批處理并且缺乏實(shí)時處理功能,因此它開始出現(xiàn)。因此,引入了Apache Spark,因?yàn)樗梢詫?shí)時執(zhí)行流處理,也可以處理批處理。

        Apache Spark是Scala語言實(shí)現(xiàn)的一個計(jì)算框架。為了支持Python語言使用Spark,Apache Spark社區(qū)開發(fā)了一個工具PySpark。我們可以通過Python語言操作RDDs

        RDD簡介

        RDD (Resiliennt Distributed Datasets)

        ?RDD = 彈性 + 分布式 Datasets

        1)分布式,好處是讓數(shù)據(jù)在不同工作節(jié)點(diǎn)并行存儲,并行計(jì)算

        2)彈性,指的節(jié)點(diǎn)存儲時,既可以使用內(nèi)存,也可以使用外存

        ?RDD還有個特性是延遲計(jì)算,也就是一個完整的RDD運(yùn)行任務(wù)分成兩部分:Transformation和Action

        Spark RDD的特性:

        • 分布式:可以分布在多臺機(jī)器上進(jìn)行并行處理
        • 彈性:計(jì)算過程中內(nèi)存不夠時,它會和磁盤進(jìn)行數(shù)據(jù)交換
        • 基于內(nèi)存:可以全部或部分緩存在內(nèi)存中
        • 只讀:不能修改,只能通過轉(zhuǎn)換操作生成新的 RDD

        2.Pandas和PySpark對比

        可以參考這位作者的,詳細(xì)的介紹了pyspark與pandas之間的區(qū)別:
        https://link.zhihu.com/?target=https%3A//blog.csdn.net/suzyu12345/article/details/79673483

        3.PySpark實(shí)戰(zhàn)小練

        數(shù)據(jù)集:從1994年人口普查數(shù)據(jù)庫中提取。(后臺回復(fù)“210323”可獲取)

        TO DO:預(yù)測一個人新收入是否會超過5萬美金

        參數(shù)說明:

        創(chuàng)建SparkSession

        from pyspark.sql import SparkSession
        spark=SparkSession.builder.appName('adult').getOrCreate()

        讀取數(shù)據(jù)

        df = spark.read.csv('adult.csv', inferSchema = True, header=True#讀取csv文件
        df.show(3)  #用來顯示前3行

        注意:pyspark必須創(chuàng)建SparkSession才能像類似于pandas一樣操作數(shù)據(jù)集

        我們看看數(shù)據(jù)集:

        cols = df.columns #和pandas一樣看列名
        df.printSchema()

        root
        |-- age: integer (nullable = true)
        |-- workclass: string (nullable = true)
        |-- fnlwgt: integer (nullable = true)
        |-- education: string (nullable = true)
        |-- education-num: integer (nullable = true)
        |-- marital-status: string (nullable = true)
        |-- occupation: string (nullable = true)
        |-- relationship: string (nullable = true)
        |-- race: string (nullable = true)
        |-- sex: string (nullable = true)
        |-- capital-gain: integer (nullable = true)
        |-- capital-loss: integer (nullable = true)
        |-- hours-per-week: integer (nullable = true)
        |-- native-country: string (nullable = true)
        |-- income: string (nullable = true)

        #找到所有的string類型的變量
        #dtypes用來看數(shù)據(jù)變量類型
        cat_features = [item[0for item in df.dtypes if item[1]=='string']
        # 需要刪除 income列,否則標(biāo)簽泄露
        cat_features.remove('income')
        #找到所有數(shù)字變量
        num_features = [item[0for item in df.dtypes if item[1]!='string']

        對于類別變量我們需要進(jìn)行編碼,在pyspark中提供了StringIndexer, OneHotEncoder, VectorAssembler特征編碼模式:

        from pyspark.ml import Pipeline
        from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

        stages = []
        for col in cat_features:
            # 字符串轉(zhuǎn)成索引
            string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')
            # 轉(zhuǎn)換為OneHot編碼
            encoder = OneHotEncoder(inputCols=[string_index.getOutputCol()], outputCols=[col + "_one_hot"])
            # 將每個字段的轉(zhuǎn)換方式 放到stages中
            stages += [string_index, encoder]

        # 將income轉(zhuǎn)換為索引
        label_string_index = StringIndexer(inputCol = 'income', outputCol = 'label')
        # 添加到stages中
        stages += [label_string_index]

        # 類別變量 + 數(shù)值變量
        assembler_cols = [c + "_one_hot" for c in cat_features] + num_features
        assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
        stages += [assembler]

        # 使用pipeline完成數(shù)據(jù)處理
        pipeline = Pipeline(stages=stages)
        pipeline_model = pipeline.fit(df)
        df = pipeline_model.transform(df)
        selected_cols = ["label""features"] + cols
        df = df.select(selected_cols)

        因?yàn)閜yspark顯示的數(shù)據(jù)比較像Mysql 那樣不方便觀看,因此我們轉(zhuǎn)成pandas:

        import pandas as pd
        pd.DataFrame(df.take(20), columns = df.columns)

        通過pandas發(fā)現(xiàn),好像還有較多字符串變量,難道特征編碼失敗了?

        原來是使用VectorAssembler直接將特征轉(zhuǎn)成了features這一列,pyspark做ML時 需要特征編碼好了并做成向量列,

        到這里,數(shù)據(jù)的特征工程就做好了。

        分割數(shù)據(jù)集 測試集

        train, test = df.randomSplit([0.70.3], seed=2021)
        print(train.count())
        print(test.count())

        22795
        9766

        可以看到,訓(xùn)練集和測試集安裝7:3的比例分割了,接下來就是構(gòu)建模型進(jìn)行訓(xùn)練。

        邏輯回歸

        from pyspark.ml.classification import LogisticRegression
        # 創(chuàng)建模型
        lr = LogisticRegression(featuresCol = 'features', labelCol = 'label',maxIter=10)
        lr_model = lr.fit(train)

        可以看到ML的用法和sklearn非常的像,因此使用起來也是相當(dāng)?shù)姆奖恪?/p>

        #結(jié)果預(yù)測

        predictions = lr_model.transform(test)

        看看predictions的結(jié)構(gòu)

        predictions.printSchema()

        root
        |-- label: double (nullable = false)
        |-- features: vector (nullable = true)
        |-- age: integer (nullable = true)
        |-- workclass: string (nullable = true)
        |-- fnlwgt: integer (nullable = true)
        |-- education: string (nullable = true)
        |-- education-num: integer (nullable = true)
        |-- marital-status: string (nullable = true)
        |-- occupation: string (nullable = true)
        |-- relationship: string (nullable = true)
        |-- race: string (nullable = true)
        |-- sex: string (nullable = true)
        |-- capital-gain: integer (nullable = true)
        |-- capital-loss: integer (nullable = true)
        |-- hours-per-week: integer (nullable = true)
        |-- native-country: string (nullable = true)
        |-- income: string (nullable = true)
        |-- rawPrediction: vector (nullable = true)
        |-- probability: vector (nullable = true)
        |-- prediction: double (nullable = false)

        抽取需要的信息

        selected = predictions.select("label""prediction""probability""age""occupation")
        display(selected)
        selected.show(4)

        技術(shù)AUC值

        from pyspark.ml.evaluation import BinaryClassificationEvaluator
        # 模型評估,通過原始數(shù)據(jù) rawPrediction計(jì)算AUC
        evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
        print('AUC:', evaluator.evaluate(predictions))

        AUC:0.9062153434371653

        進(jìn)行網(wǎng)格調(diào)參

        from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

        # 創(chuàng)建網(wǎng)絡(luò)參數(shù),用于交叉驗(yàn)證
        param_grid = (ParamGridBuilder()
                     .addGrid(lr.regParam, [0.010.52.0])
                     .addGrid(lr.elasticNetParam, [0.00.51.0])
                     .addGrid(lr.maxIter, [1510])
                     .build())
        # 五折交叉驗(yàn)證,設(shè)置模型,網(wǎng)格參數(shù),驗(yàn)證方法,折數(shù)
        cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
        # 交叉驗(yàn)證運(yùn)行
        cv_model = cv.fit(train)
        # 對于測試數(shù)據(jù),使用五折交叉驗(yàn)證
        predictions = cv_model.transform(test)
        print('AUC:', evaluator.evaluate(predictions))

        AUC:0.9054096433333642

        決策樹模型

        from pyspark.ml.classification import DecisionTreeClassifier

        # 創(chuàng)建決策樹模型
        dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
        dt_model = dt.fit(train)

        #查看決策樹結(jié)構(gòu)
        print(dt_model._call_java('toDebugString'))

        DecisionTreeClassificationModel: uid=DecisionTreeClassifier_4bd113e9a3c2, depth=3, numNodes=11, numClasses=2, numFeatures=100
        If (feature 23 in {0.0})
        If (feature 97 <= 7792.0)
        Predict: 0.0
        Else (feature 97 > 7792.0)
        If (feature 94 <= 19.5)
        Predict: 0.0
        Else (feature 94 > 19.5)
        Predict: 1.0
        Else (feature 23 not in {0.0})
        If (feature 96 <= 12.5)
        If (feature 97 <= 3368.0)
        Predict: 0.0
        Else (feature 97 > 3368.0)
        Predict: 1.0
        Else (feature 96 > 12.5)
        Predict: 1.0

        predictions = dt_model.transform(test)
        predictions.printSchema()

        root
        |-- label: double (nullable = false)
        |-- features: vector (nullable = true)
        |-- age: integer (nullable = true)
        |-- workclass: string (nullable = true)
        |-- fnlwgt: integer (nullable = true)
        |-- education: string (nullable = true)
        |-- education-num: integer (nullable = true)
        |-- marital-status: string (nullable = true)
        |-- occupation: string (nullable = true)
        |-- relationship: string (nullable = true)
        |-- race: string (nullable = true)
        |-- sex: string (nullable = true)
        |-- capital-gain: integer (nullable = true)
        |-- capital-loss: integer (nullable = true)
        |-- hours-per-week: integer (nullable = true)
        |-- native-country: string (nullable = true)
        |-- income: string (nullable = true)
        |-- rawPrediction: vector (nullable = true)
        |-- probability: vector (nullable = true)
        |-- prediction: double (nullable = false)

        #計(jì)算AUC值

        from pyspark.ml.evaluation import BinaryClassificationEvaluator

        evaluator = BinaryClassificationEvaluator()
        evaluator.evaluate(predictions)

        0.7455098804457034

        網(wǎng)格搜參數(shù)

        from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
        param_grid = (ParamGridBuilder()
                     .addGrid(dt.maxDepth, [12610])
                     .addGrid(dt.maxBins, [204080])
                     .build())
        # 設(shè)置五折交叉驗(yàn)證
        cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
        # 運(yùn)行cv
        cv_model = cv.fit(train)
        # 查看最優(yōu)模型
        print("numNodes = ", cv_model.bestModel.numNodes)
        print("depth = ", cv_model.bestModel.depth)

        numNodes = 429
        depth = 10

        # 使用五折交叉驗(yàn)證進(jìn)行預(yù)測
        predictions = cv_model.transform(test)
        evaluator.evaluate(predictions)

        0.7850384321616918

        隨機(jī)森林

        from pyspark.ml.classification import RandomForestClassifier
        # 隨機(jī)森林
        rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
        rf_model = rf.fit(train)
        predictions = rf_model.transform(test)
        predictions.printSchema()
        selected = predictions.select("label""prediction""probability""age""occupation")
        display(selected)

        evaluator = BinaryClassificationEvaluator()
        evaluator.evaluate(predictions)

        0.8932162982538805

        一樣的方法

        param_grid = (ParamGridBuilder()
                     .addGrid(rf.maxDepth, [357])
                     .addGrid(rf.maxBins, [2050])
                     .addGrid(rf.numTrees, [510])
                     .build())
        cv = CrossValidator(estimator=rf, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
        # 運(yùn)行CV(大約6分鐘)
        cv_model = cv.fit(train)
        predictions = cv_model.transform(test)
        evaluator.evaluate(predictions)

        0.8948376797236669

        查看模型的結(jié)構(gòu)和特征重要性

        best_model

        RandomForestClassificationModel: uid=RandomForestClassifier_15bbbdd6642a, numTrees=10, numClasses=2, numFeatures=100

        best_model.featureImportances

        太長了省略

        df.schema['features'].metadata
        temp = df.schema["features"].metadata["ml_attr"]["attrs"]
        df_importance = pd.DataFrame(columns=['idx''name'])
        for attr in temp['numeric']:
            temp_df = {}
            temp_df['idx'] = attr['idx']
            temp_df['name'] = attr['name']
            #print(temp_df)
            df_importance = df_importance.append(temp_df, ignore_index=True)
            #print(attr['idx'], attr['name'])
            #print(attr)
            #break
        df_importance

        for attr in temp['binary']:
            temp_df = {}
            temp_df['idx'] = attr['idx']
            temp_df['name'] = attr['name']
            df_importance = df_importance.append(temp_df, ignore_index=True)
        df_importance

        對特征重要性進(jìn)行可視化

        df_temp = pd.DataFrame(best_model.featureImportances.toArray())
        df_temp.columns = ['feature_importance']
        df_importance = df_importance.merge(df_temp, left_index=True, right_index=True)
        df_importance.sort_values(by=['feature_importance'], ascending=False, inplace=True)
        df_importance

        4.小結(jié)

        本節(jié)選用了一個常規(guī)的數(shù)據(jù)集,需要通過UCI提供的數(shù)據(jù)預(yù)測個人收入是否會大于5萬,本節(jié)用PySpark對數(shù)據(jù)進(jìn)行了讀取,特征的編碼以及特征的構(gòu)建,并分別使用了邏輯回歸、決策樹以及隨機(jī)森林算法展示數(shù)據(jù)預(yù)測的過程。

        spark通過封裝成pyspark后使用難度降低了很多,而且pyspark的ML包提供了基本的機(jī)器學(xué)習(xí)模型,可以直接使用,模型的使用方法和sklearn比較相似,因此學(xué)習(xí)成本較低。

        往期精彩回顧





        本站qq群851320808,加入微信群請掃碼:

        瀏覽 53
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩无码家庭乱伦 | 成人三级经典 | 91视频成人 | 手机无码视频在线观看 | 涩涩黄片 | 国产精品秘 一区二区三区高潮 | 中文字幕三级电影 | 亚洲中文日韩欧美综合版 | 美女被下药脱了衣服糟蹋 | 天堂色 |