1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        PySpark入門級(jí)學(xué)習(xí)教程,框架思維(上)

        共 11811字,需瀏覽 24分鐘

         ·

        2021-04-12 23:01


        為什么要學(xué)習(xí)Spark?作為數(shù)據(jù)從業(yè)者多年,個(gè)人覺得Spark已經(jīng)越來越走進(jìn)我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會(huì)或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進(jìn)行高效操作,實(shí)現(xiàn)很多之前由于計(jì)算資源而無法輕易實(shí)現(xiàn)的東西。網(wǎng)上有很多關(guān)于Spark的好處,這里就不做過多的贅述,我們直接進(jìn)入這篇文章的正文!

        關(guān)于PySpark,我們知道它是Python調(diào)用Spark的接口,我們可以通過調(diào)用Python API的方式來編寫Spark程序,它支持了大多數(shù)的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調(diào)用Spark的力量就顯得十分easy了。下面我將會(huì)從相對宏觀的層面介紹一下PySpark,讓我們對于這個(gè)神器有一個(gè)框架性的認(rèn)識(shí),知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!

        ?? 目錄:

        • 安裝指引
        • 基礎(chǔ)概念
        • 常用函數(shù)
        • Sparksql使用
        • 調(diào)優(yōu)思路
        • 學(xué)習(xí)資源推薦

        ?? 安裝指引:

        安裝這塊本文就不展開具體的步驟了,畢竟大家的機(jī)子環(huán)境都不盡相同。不過可以簡單說幾點(diǎn)重要的步驟,然后節(jié)末放上一些安裝示例供大家參考。

        1)要使用PySpark,機(jī)子上要有Java開發(fā)環(huán)境
        2)環(huán)境變量記得要配置完整
        3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時(shí)候可以使用 shift+command+G 來使用路徑訪問。
        4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會(huì)生效的哈
        5)版本記得要搞對,保險(xiǎn)起見Java的jdk版本選擇低版本(別問我為什么知道),我選擇的是Java8.

        下面是一些示例,可以參考下:

        1)Mac下安裝spark,并配置pycharm-pyspark完整教程

        https://blog.csdn.net/shiyutianming/article/details/99946797

        2)virtualBox里安裝開發(fā)環(huán)境

        https://www.bilibili.com/video/BV1i4411i79a?p=3

        3)快速搭建spark開發(fā)環(huán)境,云哥項(xiàng)目

        https://github.com/lyhue1991/eat_pyspark_in_10_days

        ?? 基礎(chǔ)概念

        關(guān)于Spark的基礎(chǔ)概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學(xué)習(xí)Spark?先帶你了解一些基礎(chǔ)的知識(shí)》。作為補(bǔ)充,今天在這里也介紹一些在Spark中會(huì)經(jīng)常遇見的專有名詞。

        ???♀? Q1: 什么是RDD

        RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數(shù)據(jù)抽象集合,它可以被執(zhí)行在分布式的集群上進(jìn)行各種操作,而且有較強(qiáng)的容錯(cuò)機(jī)制。RDD可以被分為若干個(gè)分區(qū),每一個(gè)分區(qū)就是一個(gè)數(shù)據(jù)集片段,從而可以支持分布式計(jì)算。

        ???♀? Q2: RDD運(yùn)行時(shí)相關(guān)的關(guān)鍵名詞

        簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個(gè)東西在調(diào)優(yōu)的時(shí)候也會(huì)經(jīng)常遇到的。

        Client:指的是客戶端進(jìn)程,主要負(fù)責(zé)提交job到Master;

        Job:Job來自于我們編寫的程序,Application包含一個(gè)或者多個(gè)job,job包含各種RDD操作;

        Master:指的是Standalone模式中的主控節(jié)點(diǎn),負(fù)責(zé)接收來自Client的job,并管理著worker,可以給worker分配任務(wù)和資源(主要是driver和executor資源);

        Worker:指的是Standalone模式中的slave節(jié)點(diǎn),負(fù)責(zé)管理本節(jié)點(diǎn)的資源,同時(shí)受Master管理,需要定期給Master回報(bào)heartbeat(心跳),啟動(dòng)Driver和Executor;

        Driver:指的是 job(作業(yè))的主進(jìn)程,一般每個(gè)Spark作業(yè)都會(huì)有一個(gè)Driver進(jìn)程,負(fù)責(zé)整個(gè)作業(yè)的運(yùn)行,包括了job的解析、Stage的生成、調(diào)度Task到Executor上去執(zhí)行;

        Stage:中文名 階段,是job的基本調(diào)度單位,因?yàn)槊總€(gè)job會(huì)分成若干組Task,每組任務(wù)就被稱為 Stage;

        Task:任務(wù),指的是直接運(yùn)行在executor上的東西,是executor上的一個(gè)線程;

        Executor:指的是 執(zhí)行器,顧名思義就是真正執(zhí)行任務(wù)的地方了,一個(gè)集群可以被配置若干個(gè)Executor,每個(gè)Executor接收來自Driver的Task,并執(zhí)行它(可同時(shí)執(zhí)行多個(gè)Task)。

        ???♀? Q3: 什么是DAG

        全稱是 Directed Acyclic Graph,中文名是有向無環(huán)圖。Spark就是借用了DAG對RDD之間的關(guān)系進(jìn)行了建模,用來描述RDD之間的因果依賴關(guān)系。因?yàn)樵谝粋€(gè)Spark作業(yè)調(diào)度中,多個(gè)作業(yè)任務(wù)之間也是相互依賴的,有些任務(wù)需要在一些任務(wù)執(zhí)行完成了才可以執(zhí)行的。在Spark調(diào)度中就是有DAGscheduler,它負(fù)責(zé)將job分成若干組Task組成的Stage。

        ???♀? Q4: Spark的部署模式有哪些

        主要有l(wèi)ocal模式、Standalone模式、Mesos模式、YARN模式。

        更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664

        ???♀? Q5: Shuffle操作是什么

        Shuffle指的是數(shù)據(jù)從Map端到Reduce端的數(shù)據(jù)傳輸過程,Shuffle性能的高低直接會(huì)影響程序的性能。因?yàn)镽educe task需要跨節(jié)點(diǎn)去拉在分布在不同節(jié)點(diǎn)上的Map task計(jì)算結(jié)果,這一個(gè)過程是需要有磁盤IO消耗以及數(shù)據(jù)網(wǎng)絡(luò)傳輸?shù)南牡模孕枰鶕?jù)實(shí)際數(shù)據(jù)情況進(jìn)行適當(dāng)調(diào)整。另外,Shuffle可以分為兩部分,分別是Map階段的數(shù)據(jù)準(zhǔn)備與Reduce階段的數(shù)據(jù)拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。

        ???♀? Q6: 什么是惰性執(zhí)行

        這是RDD的一個(gè)特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會(huì)真正執(zhí)行,只會(huì)記錄一下依賴關(guān)系,直到遇見了Action算子,在這之前的所有Transform操作才會(huì)被觸發(fā)計(jì)算,這就是所謂的惰性執(zhí)行。具體哪些是Transform和Action算子,可以看下一節(jié)。

        ?? 常用函數(shù)

        從網(wǎng)友的總結(jié)來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。

        pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

        圖來自 edureka 的pyspark入門教程

        下面我們用自己創(chuàng)建的RDD:sc.parallelize(range(1,11),4)

        import os
        import pyspark
        from pyspark import SparkContext, SparkConf

        conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
        sc = SparkContext(conf=conf)

        # 使用 parallelize方法直接實(shí)例化一個(gè)RDD
        rdd = sc.parallelize(range(1,11),4# 這里的 4 指的是分區(qū)數(shù)量
        rdd.take(100)
        # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


        """
        ----------------------------------------------
                        Transform算子解析
        ----------------------------------------------
        """

        # 以下的操作由于是Transform操作,因?yàn)槲覀冃枰谧詈蠹由弦粋€(gè)collect算子用來觸發(fā)計(jì)算。
        # 1. map: 和python差不多,map轉(zhuǎn)換就是對每一個(gè)元素進(jìn)行一個(gè)映射
        rdd = sc.parallelize(range(111), 4)
        rdd_map = rdd.map(lambda x: x*2)
        print("原始數(shù)據(jù):", rdd.collect())
        print("擴(kuò)大2倍:", rdd_map.collect())
        # 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        # 擴(kuò)大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

        # 2. flatMap: 這個(gè)相比于map多一個(gè)flat(壓平)操作,顧名思義就是要把高維的數(shù)組變成一維
        rdd2 = sc.parallelize(["hello SamShare""hello PySpark"])
        print("原始數(shù)據(jù):", rdd2.collect())
        print("直接split之后的map結(jié)果:", rdd2.map(lambda x: x.split(" ")).collect())
        print("直接split之后的flatMap結(jié)果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
        # 直接split之后的map結(jié)果: [['hello', 'SamShare'], ['hello', 'PySpark']]
        # 直接split之后的flatMap結(jié)果: ['hello', 'SamShare', 'hello', 'PySpark']

        # 3. filter: 過濾數(shù)據(jù)
        rdd = sc.parallelize(range(111), 4)
        print("原始數(shù)據(jù):", rdd.collect())
        print("過濾奇數(shù):", rdd.filter(lambda x: x % 2 == 0).collect())
        # 原始數(shù)據(jù): [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        # 過濾奇數(shù): [2, 4, 6, 8, 10]

        # 4. distinct: 去重元素
        rdd = sc.parallelize([2248888163232])
        print("原始數(shù)據(jù):", rdd.collect())
        print("去重?cái)?shù)據(jù):", rdd.distinct().collect())
        # 原始數(shù)據(jù): [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
        # 去重?cái)?shù)據(jù): [4, 8, 16, 32, 2]

        # 5. reduceByKey: 根據(jù)key來映射數(shù)據(jù)
        from operator import add
        rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
        print("原始數(shù)據(jù):", rdd.collect())
        print("原始數(shù)據(jù):", rdd.reduceByKey(add).collect())
        # 原始數(shù)據(jù): [('a', 1), ('b', 1), ('a', 1)]
        # 原始數(shù)據(jù): [('b', 1), ('a', 2)]

        # 6. mapPartitions: 根據(jù)分區(qū)內(nèi)的數(shù)據(jù)進(jìn)行映射操作
        rdd = sc.parallelize([1234], 2)
        def f(iterator):
            yield sum(iterator)
        print(rdd.collect())
        print(rdd.mapPartitions(f).collect())
        # [1, 2, 3, 4]
        # [3, 7]

        # 7. sortBy: 根據(jù)規(guī)則進(jìn)行排序
        tmp = [('a'1), ('b'2), ('1'3), ('d'4), ('2'5)]
        print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
        print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
        # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
        # [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

        # 8. subtract: 數(shù)據(jù)集相減, Return each value in self that is not contained in other.
        x = sc.parallelize([("a"1), ("b"4), ("b"5), ("a"3)])
        y = sc.parallelize([("a"3), ("c"None)])
        print(sorted(x.subtract(y).collect()))
        # [('a', 1), ('b', 4), ('b', 5)]

        # 9. union: 合并兩個(gè)RDD
        rdd = sc.parallelize([1123])
        print(rdd.union(rdd).collect())
        # [1, 1, 2, 3, 1, 1, 2, 3]

        # 10. intersection: 取兩個(gè)RDD的交集,同時(shí)有去重的功效
        rdd1 = sc.parallelize([110234523])
        rdd2 = sc.parallelize([162378])
        print(rdd1.intersection(rdd2).collect())
        # [1, 2, 3]

        # 11. cartesian: 生成笛卡爾積
        rdd = sc.parallelize([12])
        print(sorted(rdd.cartesian(rdd).collect()))
        # [(1, 1), (1, 2), (2, 1), (2, 2)]

        # 12. zip: 拉鏈合并,需要兩個(gè)RDD具有相同的長度以及分區(qū)數(shù)量
        x = sc.parallelize(range(05))
        y = sc.parallelize(range(10001005))
        print(x.collect())
        print(y.collect())
        print(x.zip(y).collect())
        # [0, 1, 2, 3, 4]
        # [1000, 1001, 1002, 1003, 1004]
        # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

        # 13. zipWithIndex: 將RDD和一個(gè)從0開始的遞增序列按照拉鏈方式連接。
        rdd_name = sc.parallelize(["LiLei""Hanmeimei""Lily""Lucy""Ann""Dachui""RuHua"])
        rdd_index = rdd_name.zipWithIndex()
        print(rdd_index.collect())
        # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

        # 14. groupByKey: 按照key來聚合數(shù)據(jù)
        rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
        print(rdd.collect())
        print(sorted(rdd.groupByKey().mapValues(len).collect()))
        print(sorted(rdd.groupByKey().mapValues(list).collect()))
        # [('a', 1), ('b', 1), ('a', 1)]
        # [('a', 2), ('b', 1)]
        # [('a', [1, 1]), ('b', [1])]

        # 15. sortByKey:
        tmp = [('a'1), ('b'2), ('1'3), ('d'4), ('2'5)]
        print(sc.parallelize(tmp).sortByKey(True1).collect())
        # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

        # 16. join:
        x = sc.parallelize([("a"1), ("b"4)])
        y = sc.parallelize([("a"2), ("a"3)])
        print(sorted(x.join(y).collect()))
        # [('a', (1, 2)), ('a', (1, 3))]

        # 17. leftOuterJoin/rightOuterJoin
        x = sc.parallelize([("a"1), ("b"4)])
        y = sc.parallelize([("a"2)])
        print(sorted(x.leftOuterJoin(y).collect()))
        # [('a', (1, 2)), ('b', (4, None))]

        """
        ----------------------------------------------
                        Action算子解析
        ----------------------------------------------
        """

        # 1. collect: 指的是把數(shù)據(jù)都匯集到driver端,便于后續(xù)的操作
        rdd = sc.parallelize(range(05))
        rdd_collect = rdd.collect()
        print(rdd_collect)
        # [0, 1, 2, 3, 4]

        # 2. first: 取第一個(gè)元素
        sc.parallelize([234]).first()
        # 2

        # 3. collectAsMap: 轉(zhuǎn)換為dict,使用這個(gè)要注意了,不要對大數(shù)據(jù)用,不然全部載入到driver端會(huì)爆內(nèi)存
        m = sc.parallelize([(12), (34)]).collectAsMap()
        m
        # {1: 2, 3: 4}

        # 4. reduce: 逐步對兩個(gè)元素進(jìn)行操作
        rdd = sc.parallelize(range(10),5)
        print(rdd.reduce(lambda x,y:x+y))
        # 45

        # 5. countByKey/countByValue:
        rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
        print(sorted(rdd.countByKey().items()))
        print(sorted(rdd.countByValue().items()))
        # [('a', 2), ('b', 1)]
        # [(('a', 1), 2), (('b', 1), 1)]

        # 6. take: 相當(dāng)于取幾個(gè)數(shù)據(jù)到driver端
        rdd = sc.parallelize([("a"1), ("b"1), ("a"1)])
        print(rdd.take(5))
        # [('a', 1), ('b', 1), ('a', 1)]

        # 7. saveAsTextFile: 保存rdd成text文件到本地
        text_file = "./data/rdd.txt"
        rdd = sc.parallelize(range(5))
        rdd.saveAsTextFile(text_file)

        # 8. takeSample: 隨機(jī)取數(shù)
        rdd = sc.textFile("./test/data/hello_samshare.txt"4)  # 這里的 4 指的是分區(qū)數(shù)量
        rdd_sample = rdd.takeSample(True20)  # withReplacement 參數(shù)1:代表是否是有放回抽樣
        rdd_sample

        # 9. foreach: 對每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
        rdd = sc.parallelize(range(10), 5)
        accum = sc.accumulator(0)
        rdd.foreach(lambda x: accum.add(x))
        print(accum.value)
        # 45


        Sam:未完待續(xù)... 文章較長,分上下兩篇文章來寫哈。


        ??學(xué)習(xí)資源推薦:

        1)edureka about PySpark Tutorial
        印度老哥的課程,B站可直接看,不過口音略難聽懂不過還好有字幕。
        https://www.bilibili.com/video/BV1i4411i79a?p=1
        2)eat_pyspark_in_10_days
        梁云大哥的課程,講得超級(jí)清晰,建議精讀。
        https://github.com/lyhue1991/eat_pyspark_in_10_days
        3)官方文檔
        http://spark.apache.org/docs/latest/api/python/reference/index.html

        瀏覽 81
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            三级片精品播放 | 欧美日韩中文字幕在线视频 | 丁香五月综合视频 | 中文亚洲激情 | 美女啪啪啪免费网站 | 曰韩免费一区二区三区 | 成人国产精品秘 久久久春色 | rylskyart少妇人体 | 无遮挡国产高潮视频免费观看 | 国产美女被强躁到呻吟红视频 |