1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        2小時(shí)入門Spark之RDD編程

        共 11617字,需瀏覽 24分鐘

         ·

        2020-12-29 18:33

        公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。

        本節(jié)將介紹RDD數(shù)據(jù)結(jié)構(gòu)的常用函數(shù)。包括如下內(nèi)容:

        • 創(chuàng)建RDD
        • 常用Action操作
        • 常用Transformation操作
        • 常用PairRDD的轉(zhuǎn)換操作
        • 緩存操作
        • 共享變量
        • 分區(qū)操作

        這些函數(shù)中,我最常用的是如下15個(gè)函數(shù),需要認(rèn)真掌握其用法。

        • map
        • flatMap
        • mapPartitions
        • filter
        • count
        • reduce
        • take
        • saveAsTextFile
        • collect
        • join
        • union
        • persist
        • repartition
        • reduceByKey
        • aggregateByKey

        import?findspark

        #指定spark_home為剛才的解壓路徑,指定python路徑
        spark_home?=?"/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
        python_path?=?"/Users/liangyun/anaconda3/bin/python"
        findspark.init(spark_home,python_path)

        import?pyspark?
        from?pyspark?import?SparkContext,?SparkConf
        conf?=?SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")
        sc?=?SparkContext(conf=conf)

        print(pyspark.__version__)
        3.0.1

        一,創(chuàng)建RDD

        創(chuàng)建RDD主要有兩種方式,一個(gè)是textFile加載本地或者集群文件系統(tǒng)中的數(shù)據(jù),

        第二個(gè)是用parallelize方法將Driver中的數(shù)據(jù)結(jié)構(gòu)并行化成RDD。

        #從本地文件系統(tǒng)中加載數(shù)據(jù)
        file?=?"./data/hello.txt"
        rdd?=?sc.textFile(file,3)
        rdd.collect()
        ['hello?world',
        ?'hello?spark',
        ?'spark?love?jupyter',
        ?'spark?love?pandas',
        ?'spark?love?sql']
        #從集群文件系統(tǒng)中加載數(shù)據(jù)
        #file?=?"hdfs://localhost:9000/user/hadoop/data.txt"
        #也可以省去hdfs://localhost:9000
        #rdd?=?sc.textFile(file,3)
        #parallelize將Driver中的數(shù)據(jù)結(jié)構(gòu)生成RDD,第二個(gè)參數(shù)指定分區(qū)數(shù)
        rdd?=?sc.parallelize(range(1,11),2)
        rdd.collect()
        [1,?2,?3,?4,?5,?6,?7,?8,?9,?10]

        二,常用Action操作

        Action操作將觸發(fā)基于RDD依賴關(guān)系的計(jì)算。

        collect

        rdd?=?sc.parallelize(range(10),5)?
        #collect操作將數(shù)據(jù)匯集到Driver,數(shù)據(jù)過大時(shí)有超內(nèi)存風(fēng)險(xiǎn)
        all_data?=?rdd.collect()
        all_data
        [0,?1,?2,?3,?4,?5,?6,?7,?8,?9]

        take

        #take操作將前若干個(gè)數(shù)據(jù)匯集到Driver,相比collect安全
        rdd?=?sc.parallelize(range(10),5)?
        part_data?=?rdd.take(4)
        part_data
        [0,?1,?2,?3]

        takeSample

        #takeSample可以隨機(jī)取若干個(gè)到Driver,第一個(gè)參數(shù)設(shè)置是否放回抽樣
        rdd?=?sc.parallelize(range(10),5)?
        sample_data?=?rdd.takeSample(False,10,0)
        sample_data
        [7,?8,?1,?5,?3,?4,?2,?0,?9,?6]

        first

        #first取第一個(gè)數(shù)據(jù)
        rdd?=?sc.parallelize(range(10),5)?
        first_data?=?rdd.first()
        print(first_data)
        0

        count

        #count查看RDD元素?cái)?shù)量
        rdd?=?sc.parallelize(range(10),5)
        data_count?=?rdd.count()
        print(data_count)
        10

        reduce

        #reduce利用二元函數(shù)對(duì)數(shù)據(jù)進(jìn)行規(guī)約
        rdd?=?sc.parallelize(range(10),5)?
        rdd.reduce(lambda?x,y:x+y)

        45

        foreach

        #foreach對(duì)每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
        #累加器用法詳見共享變量
        rdd?=?sc.parallelize(range(10),5)?
        accum?=?sc.accumulator(0)
        rdd.foreach(lambda?x:accum.add(x))
        print(accum.value)
        45

        countByKey

        #countByKey對(duì)Pair?RDD按key統(tǒng)計(jì)數(shù)量
        pairRdd?=?sc.parallelize([(1,1),(1,4),(3,9),(2,16)])?
        pairRdd.countByKey()
        defaultdict(int,?{1:?2,?3:?1,?2:?1})

        saveAsTextFile

        #saveAsTextFile保存rdd成text文件到本地
        text_file?=?"./data/rdd.txt"
        rdd?=?sc.parallelize(range(5))
        rdd.saveAsTextFile(text_file)

        #重新讀入會(huì)被解析文本
        rdd_loaded?=?sc.textFile(file)
        rdd_loaded.collect()
        ['2',?'3',?'4',?'1',?'0']

        三,常用Transformation操作

        Transformation轉(zhuǎn)換操作具有懶惰執(zhí)行的特性,它只指定新的RDD和其父RDD的依賴關(guān)系,只有當(dāng)Action操作觸發(fā)到該依賴的時(shí)候,它才被計(jì)算。

        map

        #map操作對(duì)每個(gè)元素進(jìn)行一個(gè)映射轉(zhuǎn)換
        rdd?=?sc.parallelize(range(10),3)
        rdd.collect()
        [0,?1,?2,?3,?4,?5,?6,?7,?8,?9]
        rdd.map(lambda?x:x**2).collect()
        [0,?1,?4,?9,?16,?25,?36,?49,?64,?81]

        filter

        #filter應(yīng)用過濾條件過濾掉一些數(shù)據(jù)
        rdd?=?sc.parallelize(range(10),3)
        rdd.filter(lambda?x:x>5).collect()
        [6,?7,?8,?9]

        flatMap

        #flatMap操作執(zhí)行將每個(gè)元素生成一個(gè)Array后壓平
        rdd?=?sc.parallelize(["hello?world","hello?China"])
        rdd.map(lambda?x:x.split("?")).collect()
        [['hello',?'world'],?['hello',?'China']]
        rdd.flatMap(lambda?x:x.split("?")).collect()
        ['hello',?'world',?'hello',?'China']

        sample

        #sample對(duì)原rdd在每個(gè)分區(qū)按照比例進(jìn)行抽樣,第一個(gè)參數(shù)設(shè)置是否可以重復(fù)抽樣
        rdd?=?sc.parallelize(range(10),1)
        rdd.sample(False,0.5,0).collect()
        [1,?4,?9]

        distinct

        #distinct去重
        rdd?=?sc.parallelize([1,1,2,2,3,3,4,5])
        rdd.distinct().collect()
        [4,?1,?5,?2,?3]

        subtract

        #subtract找到屬于前一個(gè)rdd而不屬于后一個(gè)rdd的元素
        a?=?sc.parallelize(range(10))
        b?=?sc.parallelize(range(5,15))
        a.subtract(b).collect()
        [0,?1,?2,?3,?4]

        union

        #union合并數(shù)據(jù)
        a?=?sc.parallelize(range(5))
        b?=?sc.parallelize(range(3,8))
        a.union(b).collect()
        [0,?1,?2,?3,?4,?3,?4,?5,?6,?7]

        intersection

        #intersection求交集
        a?=?sc.parallelize(range(1,6))
        b?=?sc.parallelize(range(3,9))
        a.intersection(b).collect()
        [3,?4,?5]

        cartesian

        #cartesian笛卡爾積
        boys?=?sc.parallelize(["LiLei","Tom"])
        girls?=?sc.parallelize(["HanMeiMei","Lily"])
        boys.cartesian(girls).collect()

        [('LiLei',?'HanMeiMei'),
        ?('LiLei',?'Lily'),
        ?('Tom',?'HanMeiMei'),
        ?('Tom',?'Lily')]

        sortBy

        #按照某種方式進(jìn)行排序
        #指定按照第3個(gè)元素大小進(jìn)行排序
        rdd?=?sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
        rdd.sortBy(lambda?x:x[2]).collect()

        [(4,?1,?1),?(3,?2,?2),?(1,?2,?3)]

        zip

        #按照拉鏈方式連接兩個(gè)RDD,效果類似python的zip函數(shù)
        #需要兩個(gè)RDD具有相同的分區(qū),每個(gè)分區(qū)元素?cái)?shù)量相同

        rdd_name?=?sc.parallelize(["LiLei","Hanmeimei","Lily"])
        rdd_age?=?sc.parallelize([19,18,20])

        rdd_zip?=?rdd_name.zip(rdd_age)
        print(rdd_zip.collect())
        [('LiLei',?19),?('Hanmeimei',?18),?('Lily',?20)]

        zipWithIndex

        #將RDD和一個(gè)從0開始的遞增序列按照拉鏈方式連接。
        rdd_name?=??sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
        rdd_index?=?rdd_name.zipWithIndex()
        print(rdd_index.collect())
        [('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]

        四,常用PairRDD的轉(zhuǎn)換操作

        PairRDD指的是數(shù)據(jù)為長(zhǎng)度為2的tuple類似(k,v)結(jié)構(gòu)的數(shù)據(jù)類型的RDD,其每個(gè)數(shù)據(jù)的第一個(gè)元素被當(dāng)做key,第二個(gè)元素被當(dāng)做value.

        reduceByKey

        #reduceByKey對(duì)相同的key對(duì)應(yīng)的values應(yīng)用二元?dú)w并操作
        rdd?=?sc.parallelize([("hello",1),("world",2),
        ???????????????????????????????("hello",3),("world",5)])
        rdd.reduceByKey(lambda?x,y:x+y).collect()
        [('hello',?4),?('world',?7)]

        groupByKey

        #groupByKey將相同的key對(duì)應(yīng)的values收集成一個(gè)Iterator
        rdd?=?sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
        rdd.groupByKey().collect()
        [('hello',?),
        ?('world',?)]

        sortByKey

        #sortByKey按照key排序,可以指定是否降序
        rdd?=?sc.parallelize([("hello",1),("world",2),
        ???????????????????????????????("China",3),("Beijing",5)])
        rdd.sortByKey(False).collect()
        [('world',?2),?('hello',?1),?('China',?3),?('Beijing',?5)]

        join

        #join相當(dāng)于根據(jù)key進(jìn)行內(nèi)連接
        age?=?sc.parallelize([("LiLei",18),
        ????????????????????????("HanMeiMei",16),("Jim",20)])
        gender?=?sc.parallelize([("LiLei","male"),
        ????????????????????????("HanMeiMei","female"),("Lucy","female")])
        age.join(gender).collect()

        [('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

        leftOuterJoin和rightOuterJoin

        #leftOuterJoin相當(dāng)于關(guān)系表的左連接

        age?=?sc.parallelize([("LiLei",18),
        ????????????????????????("HanMeiMei",16)])
        gender?=?sc.parallelize([("LiLei","male"),
        ????????????????????????("HanMeiMei","female"),("Lucy","female")])
        age.leftOuterJoin(gender).collect()

        [('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
        #rightOuterJoin相當(dāng)于關(guān)系表的右連接
        age?=?sc.parallelize([("LiLei",18),
        ????????????????????????("HanMeiMei",16),("Jim",20)])
        gender?=?sc.parallelize([("LiLei","male"),
        ????????????????????????("HanMeiMei","female")])
        age.rightOuterJoin(gender).collect()

        [('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]

        cogroup

        #cogroup相當(dāng)于對(duì)兩個(gè)輸入分別goupByKey然后再對(duì)結(jié)果進(jìn)行g(shù)roupByKey

        x?=?sc.parallelize([("a",1),("b",2),("a",3)])
        y?=?sc.parallelize([("a",2),("b",3),("b",5)])

        result?=?x.cogroup(y).collect()
        print(result)
        print(list(result[0][1][0]))
        [('a',?(,?)),?('b',?(,?))]
        [1,?3]

        subtractByKey

        #subtractByKey去除x中那些key也在y中的元素

        x?=?sc.parallelize([("a",1),("b",2),("c",3)])
        y?=?sc.parallelize([("a",2),("b",(1,2))])

        x.subtractByKey(y).collect()
        [('c',?3)]

        foldByKey

        #foldByKey的操作和reduceByKey類似,但是要提供一個(gè)初始值
        x?=?sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
        x.foldByKey(1,lambda?x,y:x*y).collect()

        [('a',?3),?('b',?10)]

        五,緩存操作

        如果一個(gè)rdd被多個(gè)任務(wù)用作中間量,那么對(duì)其進(jìn)行cache緩存到內(nèi)存中對(duì)加快計(jì)算會(huì)非常有幫助。

        聲明對(duì)一個(gè)rdd進(jìn)行cache后,該rdd不會(huì)被立即緩存,而是等到它第一次被計(jì)算出來時(shí)才進(jìn)行緩存。

        可以使用persist明確指定存儲(chǔ)級(jí)別,常用的存儲(chǔ)級(jí)別是MEMORY_ONLY和EMORY_AND_DISK。

        如果一個(gè)RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執(zhí)行的。

        緩存數(shù)據(jù)不會(huì)切斷血緣依賴關(guān)系,這是因?yàn)榫彺鏀?shù)據(jù)某些分區(qū)所在的節(jié)點(diǎn)有可能會(huì)有故障,例如內(nèi)存溢出或者節(jié)點(diǎn)損壞。

        這時(shí)候可以根據(jù)血緣關(guān)系重新計(jì)算這個(gè)分區(qū)的數(shù)據(jù)。

        #cache緩存到內(nèi)存中,使用存儲(chǔ)級(jí)別 MEMORY_ONLY。
        #MEMORY_ONLY意味著如果內(nèi)存存儲(chǔ)不下,放棄存儲(chǔ)其余部分,需要時(shí)重新計(jì)算。
        a?=?sc.parallelize(range(10000),5)
        a.cache()
        sum_a?=?a.reduce(lambda?x,y:x+y)
        cnt_a?=?a.count()
        mean_a?=?sum_a/cnt_a

        print(mean_a)

        #persist緩存到內(nèi)存或磁盤中,默認(rèn)使用存儲(chǔ)級(jí)別MEMORY_AND_DISK
        #MEMORY_AND_DISK意味著如果內(nèi)存存儲(chǔ)不下,其余部分存儲(chǔ)到磁盤中。
        #persist可以指定其它存儲(chǔ)級(jí)別,cache相當(dāng)于persist(MEMORY_ONLY)
        from??pyspark.storagelevel?import?StorageLevel
        a?=?sc.parallelize(range(10000),5)
        a.persist(StorageLevel.MEMORY_AND_DISK)
        sum_a?=?a.reduce(lambda?x,y:x+y)
        cnt_a?=?a.count()
        mean_a?=?sum_a/cnt_a

        a.unpersist()?#立即釋放緩存
        print(mean_a)

        六,共享變量

        當(dāng)spark集群在許多節(jié)點(diǎn)上運(yùn)行一個(gè)函數(shù)時(shí),默認(rèn)情況下會(huì)把這個(gè)函數(shù)涉及到的對(duì)象在每個(gè)節(jié)點(diǎn)生成一個(gè)副本。

        但是,有時(shí)候需要在不同節(jié)點(diǎn)或者節(jié)點(diǎn)和Driver之間共享變量。

        Spark提供兩種類型的共享變量,廣播變量和累加器。

        廣播變量是不可變變量,實(shí)現(xiàn)在不同節(jié)點(diǎn)不同任務(wù)之間共享數(shù)據(jù)。

        廣播變量在每個(gè)機(jī)器上緩存一個(gè)只讀的變量,而不是為每個(gè)task生成一個(gè)副本,可以減少數(shù)據(jù)的傳輸。

        累加器主要是不同節(jié)點(diǎn)和Driver之間共享變量,只能實(shí)現(xiàn)計(jì)數(shù)或者累加功能。

        累加器的值只有在Driver上是可讀的,在節(jié)點(diǎn)上不可見。

        #廣播變量?broadcast?不可變,在所有節(jié)點(diǎn)可讀

        broads?=?sc.broadcast(100)

        rdd?=?sc.parallelize(range(10))
        print(rdd.map(lambda?x:x+broads.value).collect())

        print(broads.value)
        [100,?101,?102,?103,?104,?105,?106,?107,?108,?109]
        100
        #累加器?只能在Driver上可讀,在其它節(jié)點(diǎn)只能進(jìn)行累加

        total?=?sc.accumulator(0)
        rdd?=?sc.parallelize(range(10),3)

        rdd.foreach(lambda?x:total.add(x))
        total.value
        45
        #?計(jì)算數(shù)據(jù)的平均值
        rdd?=?sc.parallelize([1.1,2.1,3.1,4.1])
        total?=?sc.accumulator(0.1)
        count?=?sc.accumulator(0)

        def?func(x):
        ????total.add(x)
        ????count.add(1)
        ????
        rdd.foreach(func)

        total.value/count.value
        2.625

        七,分區(qū)操作

        分區(qū)操作包括改變分區(qū)操作,以及針對(duì)分區(qū)執(zhí)行的一些轉(zhuǎn)換操作。

        glom:將一個(gè)分區(qū)內(nèi)的數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表作為一行。

        coalesce:shuffle可選,默認(rèn)為False情況下窄依賴,不能增加分區(qū)。repartition和partitionBy調(diào)用它實(shí)現(xiàn)。

        repartition:按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在同一個(gè)分區(qū)

        partitionBy:按key進(jìn)行shuffle,相同key放入同一個(gè)分區(qū)

        HashPartitioner:默認(rèn)分區(qū)器,根據(jù)key的hash值進(jìn)行分區(qū),相同的key進(jìn)入同一分區(qū),效率較高,key不可為Array.

        RangePartitioner:只在排序相關(guān)函數(shù)中使用,除相同的key進(jìn)入同一分區(qū),相鄰的key也會(huì)進(jìn)入同一分區(qū),key必須可排序。

        TaskContext: ?獲取當(dāng)前分區(qū)id方法 TaskContext.get.partitionId

        mapPartitions:每次處理分區(qū)內(nèi)的一批數(shù)據(jù),適合需要分批處理數(shù)據(jù)的情況,比如將數(shù)據(jù)插入某個(gè)表,每批數(shù)據(jù)只需要開啟一次數(shù)據(jù)庫(kù)連接,大大減少了連接開支

        mapPartitionsWithIndex:類似mapPartitions,提供了分區(qū)索引,輸入?yún)?shù)為(i,Iterator)

        foreachPartition:類似foreach,但每次提供一個(gè)Partition的一批數(shù)據(jù)

        glom

        #glom將一個(gè)分區(qū)內(nèi)的數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表作為一行。
        a?=?sc.parallelize(range(10),2)
        b?=?a.glom()
        b.collect()?
        [[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]

        coalesce

        #coalesce?默認(rèn)shuffle為False,不能增加分區(qū),只能減少分區(qū)
        #如果要增加分區(qū),要設(shè)置shuffle?=?true
        #parallelize等許多操作可以指定分區(qū)數(shù)
        a?=?sc.parallelize(range(10),3)??
        print(a.getNumPartitions())
        print(a.glom().collect())

        3
        [[0,?1,?2],?[3,?4,?5],?[6,?7,?8,?9]]
        b?=?a.coalesce(2)?
        print(b.glom().collect())
        [[0,?1,?2],?[3,?4,?5,?6,?7,?8,?9]]

        repartition

        #repartition按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在一個(gè)分區(qū),可以增加分區(qū)
        #repartition實(shí)際上調(diào)用coalesce實(shí)現(xiàn),設(shè)置了shuffle?=?True
        a?=?sc.parallelize(range(10),3)??
        c?=?a.repartition(4)?
        print(c.glom().collect())

        [[6,?7,?8,?9],?[3,?4,?5],?[],?[0,?1,?2]]
        #repartition按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在一個(gè)分區(qū)
        a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])??
        c?=?a.repartition(2)
        print(c.glom().collect())
        [[('a',?1),?('a',?2),?('c',?3)],?[('a',?1)]]

        partitionBy

        #partitionBy按key進(jìn)行shuffle,相同key一定在一個(gè)分區(qū)
        a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])??
        c?=?a.partitionBy(2)
        print(c.glom().collect())

        mapPartitions

        #mapPartitions可以對(duì)每個(gè)分區(qū)分別執(zhí)行操作
        #每次處理分區(qū)內(nèi)的一批數(shù)據(jù),適合需要按批處理數(shù)據(jù)的情況
        #例如將數(shù)據(jù)寫入數(shù)據(jù)庫(kù)時(shí),可以極大的減少連接次數(shù)。
        #mapPartitions的輸入分區(qū)內(nèi)數(shù)據(jù)組成的Iterator,其輸出也需要是一個(gè)Iterator
        #以下例子查看每個(gè)分區(qū)內(nèi)的數(shù)據(jù),相當(dāng)于用mapPartitions實(shí)現(xiàn)了glom的功能。
        a?=?sc.parallelize(range(10),2)
        a.mapPartitions(lambda?it:iter([list(it)])).collect()
        [[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]

        mapPartitionsWithIndex

        #mapPartitionsWithIndex可以獲取兩個(gè)參數(shù)
        #即分區(qū)id和每個(gè)分區(qū)內(nèi)的數(shù)據(jù)組成的Iterator
        a?=?sc.parallelize(range(11),2)

        def?func(pid,it):
        ????s?=?sum(it)
        ????return(iter([str(pid)?+?"|"?+?str(s)]))
        ????[str(pid)?+?"|"?+?str]
        b?=?a.mapPartitionsWithIndex(func)
        b.collect()
        #利用TaskContext可以獲取當(dāng)前每個(gè)元素的分區(qū)
        from?pyspark.taskcontext?import?TaskContext
        a?=?sc.parallelize(range(5),3)
        c?=?a.map(lambda?x:(TaskContext.get().partitionId(),x))
        c.collect()

        [(0,?0),?(1,?1),?(1,?2),?(2,?3),?(2,?4)]

        foreachPartitions

        #foreachPartition對(duì)每個(gè)分區(qū)分別執(zhí)行操作
        #范例:求每個(gè)分區(qū)內(nèi)最大值的和
        total?=?sc.accumulator(0.0)

        a?=?sc.parallelize(range(1,101),3)

        def?func(it):
        ????total.add(max(it))
        ????
        a.foreachPartition(func)
        total.value
        199.0

        aggregate

        #aggregate是一個(gè)Action操作
        #aggregate比較復(fù)雜,先對(duì)每個(gè)分區(qū)執(zhí)行一個(gè)函數(shù),再對(duì)每個(gè)分區(qū)結(jié)果執(zhí)行一個(gè)合并函數(shù)。
        #例子:求元素之和以及元素個(gè)數(shù)
        #三個(gè)參數(shù),第一個(gè)參數(shù)為初始值,第二個(gè)為分區(qū)執(zhí)行函數(shù),第三個(gè)為結(jié)果合并執(zhí)行函數(shù)。
        rdd?=?sc.parallelize(range(1,21),3)
        def?inner_func(t,x):
        ????return((t[0]+x,t[1]+1))

        def?outer_func(p,q):
        ????return((p[0]+q[0],p[1]+q[1]))

        rdd.aggregate((0,0),inner_func,outer_func)

        (210,?20)

        aggregateByKey

        #aggregateByKey的操作和aggregate類似,但是會(huì)對(duì)每個(gè)key分別進(jìn)行操作
        #第一個(gè)參數(shù)為初始值,第二個(gè)參數(shù)為分區(qū)內(nèi)歸并函數(shù),第三個(gè)參數(shù)為分區(qū)間歸并函數(shù)

        a?=?sc.parallelize([("a",1),("b",1),("c",2),
        ?????????????????????????????("a",2),("b",3)],3)
        b?=?a.aggregateByKey(0,lambda?x,y:max(x,y),
        ????????????????????????????lambda?x,y:max(x,y))
        b.collect()
        [('b',?3),?('a',?2),?('c',?2)]


        e61a2fb3ffd175084ef9c782da9689a4.webp

        公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。


        如果覺得這道RDD大餐味道還不錯(cuò)的話,?歡迎點(diǎn)個(gè)再看,分享給你的朋友們喲。??



        瀏覽 71
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            啊用力用力用力流了好多水 | 99久久婷婷国产综精品喷水 | 狠狠撸综合网 | 在线视频自拍偷拍 | 色在线视频网站 | 中文字幕亚洲乱伦 | 影音先锋亚洲男人资源站 | 五月天婷婷大香蕉 | 亚洲一级黄色大片 | 亚洲AV成人片无码好多水 |