1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        流數(shù)據(jù)處理利器

        共 753字,需瀏覽 2分鐘

         ·

        2020-10-14 08:20

        流處理 (Stream processing) 是一種計(jì)算機(jī)編程范式,其允許給定一個(gè)數(shù)據(jù)序列 (流處理數(shù)據(jù)源),一系列數(shù)據(jù)操作 (函數(shù)) 被應(yīng)用到流中的每個(gè)元素。同時(shí)流處理工具可以顯著提高程序員的開發(fā)效率,允許他們編寫有效、干凈和簡(jiǎn)潔的代碼。

        流數(shù)據(jù)處理在我們的日常工作中非常常見,舉個(gè)例子,我們?cè)跇I(yè)務(wù)開發(fā)中往往會(huì)記錄許多業(yè)務(wù)日志,這些日志一般是先發(fā)送到 Kafka,然后再由 Job 消費(fèi) Kafaka 寫到 elasticsearch,在進(jìn)行日志流處理的過程中,往往還會(huì)對(duì)日志做一些處理,比如過濾無效的日志,做一些計(jì)算以及重新組合日志等等,示意圖如下:


        流處理工具 fx

        go-zero是一個(gè)功能完備的微服務(wù)框架,框架中內(nèi)置了很多非常實(shí)用的工具,其中就包含流數(shù)據(jù)處理工具fx,下面我們通過一個(gè)簡(jiǎn)單的例子來認(rèn)識(shí)下該工具:

        package main

        import (
        "fmt"
        "os"
        "os/signal"
        "syscall"
        "time"

        "github.com/tal-tech/go-zero/core/fx"
        )

        func main() {
        ch := make(chan int)

        go inputStream(ch)
        go outputStream(ch)

        c := make(chan os.Signal, 1)
        signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
        <-c
        }

        func inputStream(ch chan int) {
        count := 0
        for {
        ch <- count
        time.Sleep(time.Millisecond * 500)
        count++
        }
        }

        func outputStream(ch chan int) {
        fx.From(func(source chan<- interface{}) {
        for c := range ch {
        source <- c
        }
        }).Walk(func(item interface{}, pipe chan<- interface{}) {
        count := item.(int)
        pipe <- count
        }).Filter(func(item interface{}) bool {
        itemInt := item.(int)
        if itemInt%2 == 0 {
        return true
        }
        return false
        }).ForEach(func(item interface{}) {
        fmt.Println(item)
        })
        }

        inputStream 函數(shù)模擬了流數(shù)據(jù)的產(chǎn)生,outputStream 函數(shù)模擬了流數(shù)據(jù)的處理過程,其中 From 函數(shù)為流的輸入,Walk 函數(shù)并發(fā)的作用在每一個(gè) item 上,F(xiàn)ilter 函數(shù)對(duì) item 進(jìn)行過濾為 true 保留為 false 不保留,F(xiàn)orEach 函數(shù)遍歷輸出每一個(gè) item 元素。

        流數(shù)據(jù)處理中間操作

        一個(gè)流的數(shù)據(jù)處理可能存在許多的中間操作,每個(gè)中間操作都可以作用在流上。就像流水線上的工人一樣,每個(gè)工人操作完零件后都會(huì)返回處理完成的新零件,同理流處理中間操作完成后也會(huì)返回一個(gè)新的流。



        fx 的流處理中間操作:

        操作函數(shù)功能輸入
        Distinct去除重復(fù)的 itemKeyFunc,返回需要去重的 key
        Filter過濾不滿足條件的 itemFilterFunc,Option 控制并發(fā)量
        Group對(duì) item 進(jìn)行分組KeyFunc,以 key 進(jìn)行分組
        Head取出前 n 個(gè) item,返回新 streamint64 保留數(shù)量
        Map對(duì)象轉(zhuǎn)換MapFunc,Option 控制并發(fā)量
        Merge合并 item 到 slice 并生成新 stream
        Reverse反轉(zhuǎn) item
        Sort對(duì) item 進(jìn)行排序LessFunc 實(shí)現(xiàn)排序算法
        Tail與 Head 功能類似,取出后 n 個(gè) item 組成新 streamint64 保留數(shù)量
        Walk作用在每個(gè) item 上WalkFunc,Option 控制并發(fā)量

        下圖展示了每個(gè)步驟和每個(gè)步驟的結(jié)果:



        用法與原理分析

        From

        通過 From 函數(shù)構(gòu)建流并返回 Stream,流數(shù)據(jù)通過 channel 進(jìn)行存儲(chǔ):

        // 例子
        s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
        fx.From(func(source chan<- interface{}) {
        for _, v := range s {
        source <- v
        }
        })

        // 源碼
        func From(generate GenerateFunc) Stream {
        source := make(chan interface{})

        go func() {
        defer close(source)
        // 構(gòu)造流數(shù)據(jù)寫入channel
        generate(source)
        }()

        return Range(source)
        }

        Filter

        Filter 函數(shù)提供過濾 item 的功能,F(xiàn)ilterFunc 定義過濾邏輯 true 保留 item,false 則不保留:

        // 例子 保留偶數(shù)
        s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
        fx.From(func(source chan<- interface{}) {
        for _, v := range s {
        source <- v
        }
        }).Filter(func(item interface{}) bool {
        if item.(int)%2 == 0 {
        return true
        }
        return false
        })

        // 源碼
        func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
        return p.Walk(func(item interface{}, pipe chan<- interface{}) {
        // 執(zhí)行過濾函數(shù)true保留,false丟棄
        if fn(item) {
        pipe <- item
        }
        }, opts...)
        }

        Group

        Group 對(duì)流數(shù)據(jù)進(jìn)行分組,需定義分組的 key,數(shù)據(jù)分組后以 slice 存入 channel:


        // 例子 按照首字符"g"或者"p"分組,沒有則分到另一組
        ss := []string{"golang", "google", "php", "python", "java", "c++"}
        fx.From(func(source chan<- interface{}) {
        for _, s := range ss {
        source <- s
        }
        }).Group(func(item interface{}) interface{} {
        if strings.HasPrefix(item.(string), "g") {
        return "g"
        } else if strings.HasPrefix(item.(string), "p") {
        return "p"
        }
        return ""
        }).ForEach(func(item interface{}) {
        fmt.Println(item)
        })

        // 源碼
        func (p Stream) Group(fn KeyFunc) Stream {
        // 定義分組存儲(chǔ)map
        groups := make(map[interface{}][]interface{})
        for item := range p.source {
        // 用戶自定義分組key
        key := fn(item)
        // key相同分到一組
        groups[key] = append(groups[key], item)
        }

        source := make(chan interface{})
        go func() {
        for _, group := range groups {
        // 相同key的一組數(shù)據(jù)寫入到channel
        source <- group
        }
        close(source)
        }()

        return Range(source)
        }

        Reverse

        reverse 可以對(duì)流中元素進(jìn)行反轉(zhuǎn)處理:



        // 例子
        fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
        fmt.Println(item)
        })

        // 源碼
        func (p Stream) Reverse() Stream {
        var items []interface{}
        // 獲取流中數(shù)據(jù)
        for item := range p.source {
        items = append(items, item)
        }
        // 反轉(zhuǎn)算法
        for i := len(items)/2 - 1; i >= 0; i-- {
        opp := len(items) - 1 - i
        items[i], items[opp] = items[opp], items[i]
        }

        // 寫入流
        return Just(items...)
        }

        Distinct

        distinct 對(duì)流中元素進(jìn)行去重,去重在業(yè)務(wù)開發(fā)中比較常用,經(jīng)常需要對(duì)用戶 id 等做去重操作:

        // 例子
        fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
        return item
        }).ForEach(func(item interface{}) {
        fmt.Println(item)
        })
        // 結(jié)果為 1,2,3,4,5,6

        // 源碼
        func (p Stream) Distinct(fn KeyFunc) Stream {
        source := make(chan interface{})

        threading.GoSafe(func() {
        defer close(source)
        // 通過key進(jìn)行去重,相同key只保留一個(gè)
        keys := make(map[interface{}]lang.PlaceholderType)
        for item := range p.source {
        key := fn(item)
        // key存在則不保留
        if _, ok := keys[key]; !ok {
        source <- item
        keys[key] = lang.Placeholder
        }
        }
        })

        return Range(source)
        }

        Walk

        Walk 函數(shù)并發(fā)的作用在流中每一個(gè) item 上,可以通過 WithWorkers 設(shè)置并發(fā)數(shù),默認(rèn)并發(fā)數(shù)為 16,最小并發(fā)數(shù)為 1,如設(shè)置 unlimitedWorkers 為 true 則并發(fā)數(shù)無限制,但并發(fā)寫入流中的數(shù)據(jù)由 defaultWorkers 限制,WalkFunc 中用戶可以自定義后續(xù)寫入流中的元素,可以不寫入也可以寫入多個(gè)元素:

        // 例子
        fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
        newItem := strings.ToUpper(item.(string))
        pipe <- newItem
        }).ForEach(func(item interface{}) {
        fmt.Println(item)
        })

        // 源碼
        func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
        pipe := make(chan interface{}, option.workers)

        go func() {
        var wg sync.WaitGroup
        pool := make(chan lang.PlaceholderType, option.workers)

        for {
        // 控制并發(fā)數(shù)量
        pool <- lang.Placeholder
        item, ok := <-p.source
        if !ok {
        <-pool
        break
        }

        wg.Add(1)
        go func() {
        defer func() {
        wg.Done()
        <-pool
        }()
        // 作用在每個(gè)元素上
        fn(item, pipe)
        }()
        }

        // 等待處理完成
        wg.Wait()
        close(pipe)
        }()

        return Range(pipe)
        }

        并發(fā)處理

        fx 工具除了進(jìn)行流數(shù)據(jù)處理以外還提供了函數(shù)并發(fā)功能,在微服務(wù)中實(shí)現(xiàn)某個(gè)功能往往需要依賴多個(gè)服務(wù),并發(fā)的處理依賴可以有效的降低依賴耗時(shí),提升服務(wù)的性能。



        fx.Parallel(func() {
        userRPC() // 依賴1
        }, func() {
        accountRPC() // 依賴2
        }, func() {
        orderRPC() // 依賴3
        })

        注意 fx.Parallel 進(jìn)行依賴并行處理的時(shí)候不會(huì)有 error 返回,如需有 error 返回或者有一個(gè)依賴報(bào)錯(cuò)需要立馬結(jié)束依賴請(qǐng)求請(qǐng)使用MapReduce工具進(jìn)行處理。

        總結(jié)

        本篇文章介紹了流處理的基本概念和 go-zero 中的流處理工具 fx,在實(shí)際的生產(chǎn)中流處理場(chǎng)景應(yīng)用也非常多,希望本篇文章能給大家?guī)硪欢ǖ膯l(fā),更好的應(yīng)對(duì)工作中的流處理場(chǎng)景。

        項(xiàng)目地址

        https://github.com/tal-tech/go-zero

        組件地址

        https://github.com/tal-tech/go-zero/tree/master/core/fx

        Example

        https://github.com/tal-tech/go-zero/tree/master/example/fx

        微信交流群

        瀏覽 51
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            k200Tv免费看片成人 | 成人 免费视频在线看一个小时 | 哺乳期丰满乳亲伦小说 | 黄色影片A片 | 靠逼视频免费网站 | ass真裸体pics | 操逼的视频免费 | 露出两个乳让男人玩 | 最近日本中文字幕中文翻译歌词 | 野外老女人黄片 |