1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        golang channel 使用總結(jié)

        共 19975字,需瀏覽 40分鐘

         ·

        2021-09-06 15:26

        本文介紹了使用 golang channel 的諸多特性和技巧,已經(jīng)熟悉了 go 語言特性的小伙伴也可以看看,很有啟發(fā)。
        不同于傳統(tǒng)的多線程并發(fā)模型使用共享內(nèi)存來實(shí)現(xiàn)線程間通信的方式,golang 的哲學(xué)是通過 channel 進(jìn)行協(xié)程 (goroutine) 之間的通信來實(shí)現(xiàn)數(shù)據(jù)共享:

        Do not communicate by sharing memory; instead, share memory by communicating.

        這種方式的優(yōu)點(diǎn)是通過提供原子的通信原語,避免了競態(tài)情形 (race condition) 下復(fù)雜的鎖機(jī)制。channel 可以看成一個(gè) FIFO 隊(duì)列,對 FIFO 隊(duì)列的讀寫都是原子的操作,不需要加鎖。對 channel 的操作行為結(jié)果總結(jié)如下:
        操作nil channelclosed channelnot-closed non-nil channel
        closepanicpanic成功 close
        寫 ch <-一直阻塞panic阻塞或成功寫入數(shù)據(jù)
        讀 <- ch一直阻塞讀取對應(yīng)類型零值阻塞或成功讀取數(shù)據(jù)
        讀取一個(gè)已關(guān)閉的 channel 時(shí),總是能讀取到對應(yīng)類型的零值,為了和讀取非空未關(guān)閉 channel 的行為區(qū)別,可以使用兩個(gè)接收值:
        // ok is false when ch is closed
        v, ok := <-ch
        golang 中大部分類型都是值類型(只有 slice / channel / map 是引用類型),讀/寫類型是值類型的 channel 時(shí),如果元素 size 比較大時(shí),應(yīng)該使用指針代替,避免頻繁的內(nèi)存拷貝開銷。

        內(nèi)部實(shí)現(xiàn)

        如圖所示,在 channel 的內(nèi)部實(shí)現(xiàn)中(具體定義在 $GOROOT/src/runtime/chan.go 里),維護(hù)了 3 個(gè)隊(duì)列:
        • 讀等待協(xié)程隊(duì)列 recvq,維護(hù)了阻塞在讀此 channel 的協(xié)程列表
        • 寫等待協(xié)程隊(duì)列 sendq,維護(hù)了阻塞在寫此 channel 的協(xié)程列表
        • 緩沖數(shù)據(jù)隊(duì)列 buf,用環(huán)形隊(duì)列實(shí)現(xiàn),不帶緩沖的 channel 此隊(duì)列 size 則為 0
        img
        當(dāng)協(xié)程嘗試從未關(guān)閉的 channel 中讀取數(shù)據(jù)時(shí),內(nèi)部的操作如下:
        1. 當(dāng) buf 非空時(shí),此時(shí) recvq 必為空,buf 彈出一個(gè)元素給讀協(xié)程,讀協(xié)程獲得數(shù)據(jù)后繼續(xù)執(zhí)行,此時(shí)若 sendq 非空,則從 sendq 中彈出一個(gè)寫協(xié)程轉(zhuǎn)入 running 狀態(tài),待寫數(shù)據(jù)入隊(duì)列 buf ,此時(shí)讀取操作 <- ch 未阻塞;
        2. 當(dāng) buf 為空但 sendq 非空時(shí)(不帶緩沖的 channel),則從 sendq 中彈出一個(gè)寫協(xié)程轉(zhuǎn)入 running 狀態(tài),待寫數(shù)據(jù)直接傳遞給讀協(xié)程,讀協(xié)程繼續(xù)執(zhí)行,此時(shí)讀取操作 <- ch 未阻塞;
        3. 當(dāng) buf 為空并且 sendq 也為空時(shí),讀協(xié)程入隊(duì)列 recvq 并轉(zhuǎn)入 blocking 狀態(tài),當(dāng)后續(xù)有其他協(xié)程往 channel 寫數(shù)據(jù)時(shí),讀協(xié)程才會重新轉(zhuǎn)入 running 狀態(tài),此時(shí)讀取操作 <- ch 阻塞。
        類似的,當(dāng)協(xié)程嘗試往未關(guān)閉的 channel 中寫入數(shù)據(jù)時(shí),內(nèi)部的操作如下:
        1. 當(dāng)隊(duì)列 recvq 非空時(shí),此時(shí)隊(duì)列 buf 必為空,從 recvq 彈出一個(gè)讀協(xié)程接收待寫數(shù)據(jù),此讀協(xié)程此時(shí)結(jié)束阻塞并轉(zhuǎn)入 running 狀態(tài),寫協(xié)程繼續(xù)執(zhí)行,此時(shí)寫入操作 ch <- 未阻塞;
        2. 當(dāng)隊(duì)列 recvq 為空但 buf 未滿時(shí),此時(shí) sendq 必為空,寫協(xié)程的待寫數(shù)據(jù)入 buf 然后繼續(xù)執(zhí)行,此時(shí)寫入操作 ch <- 未阻塞;
        3. 當(dāng)隊(duì)列 recvq 為空并且 buf 為滿時(shí),此時(shí)寫協(xié)程入隊(duì)列 sendq 并轉(zhuǎn)入 blokcing 狀態(tài),當(dāng)后續(xù)有其他協(xié)程從 channel 中讀數(shù)據(jù)時(shí),寫協(xié)程才會重新轉(zhuǎn)入 running 狀態(tài),此時(shí)寫入操作 ch <- 阻塞。
        當(dāng)關(guān)閉 non-nil channel 時(shí),內(nèi)部的操作如下:
        1. 當(dāng)隊(duì)列 recvq 非空時(shí),此時(shí) buf 必為空,recvq 中的所有協(xié)程都將收到對應(yīng)類型的零值然后結(jié)束阻塞狀態(tài);
        2. 當(dāng)隊(duì)列 sendq 非空時(shí),此時(shí) buf 必為滿,sendq 中的所有協(xié)程都會產(chǎn)生 panic ,在 buf 中數(shù)據(jù)仍然會保留直到被其他協(xié)程讀取。

        使用場景

        除了常規(guī)的用來在協(xié)程之間傳遞數(shù)據(jù)外,本節(jié)列出了一些特殊的使用 channel 的場景。

        futures / promises

        golang 雖然沒有直接提供 futrue / promise 模型的操作原語,但通過 goroutine 和 channel 可以實(shí)現(xiàn)類似的功能:
        package main

        import (
            "io/ioutil"
            "log"
            "net/http"
        )

        // RequestFuture, http request promise.
        func RequestFuture(url string) <-chan []byte {
            c := make(chan []byte1)
            go func() {
                var body []byte
                defer func() {
                    c <- body
                }()

                res, err := http.Get(url)
                if err != nil {
                    return
                }
                defer res.Body.Close()

                body, _ = ioutil.ReadAll(res.Body)
            }()

            return c
        }

        func main() {
            future := RequestFuture("https://api.github.com/users/octocat/orgs")
            body := <-future
            log.Printf("reponse length: %d"len(body))
        }

        條件變量 (condition variable)

        類型于 POSIX 接口中線程通知其他線程某個(gè)事件發(fā)生的條件變量,channel 的特性也可以用來當(dāng)成協(xié)程之間同步的條件變量。因?yàn)?channel 只是用來通知,所以 channel 中具體的數(shù)據(jù)類型和值并不重要,這種場景一般用 strct {} 作為 channel 的類型。
        一對一通知
        類似 pthread_cond_signal() 的功能,用來在一個(gè)協(xié)程中通知另個(gè)某一個(gè)協(xié)程事件發(fā)生:
        package main

        import (
            "fmt"
            "time"
        )

        func main() {
            ch := make(chan struct{})
            nums := make([]int100)

            go func() {
                time.Sleep(time.Second)
                for i := 0; i < len(nums); i++ {
                    nums[i] = i
                }
                // send a finish signal
                ch <- struct{}{}
            }()

            // wait for finish signal
            <-ch
            fmt.Println(nums)
        }
        廣播通知
        類似 pthread_cond_broadcast() 的功能。利用從已關(guān)閉的 channel 讀取數(shù)據(jù)時(shí)總是非阻塞的特性,可以實(shí)現(xiàn)在一個(gè)協(xié)程中向其他多個(gè)協(xié)程廣播某個(gè)事件發(fā)生的通知:
        package main

        import (
            "fmt"
            "time"
        )

        func main() {
            N := 10
            exit := make(chan struct{})
            done := make(chan struct{}, N)

            // start N worker goroutines
            for i := 0; i < N; i++ {
                go func(n int) {
                    for {
                        select {
                        // wait for exit signal
                        case <-exit:
                            fmt.Printf("worker goroutine #%d exit\n", n)
                            done <- struct{}{}
                            return
                        case <-time.After(time.Second):
                            fmt.Printf("worker goroutine #%d is working...\n", n)
                        }
                    }
                }(i)
            }

            time.Sleep(3 * time.Second)
            // broadcast exit signal
            close(exit)
            // wait for all worker goroutines exit
            for i := 0; i < N; i++ {
                <-done
            }
            fmt.Println("main goroutine exit")
        }

        信號量

        channel 的讀/寫相當(dāng)于信號量的 P / V 操作,下面的示例程序中 channel 相當(dāng)于信號量:
        package main

        import (
            "log"
            "math/rand"
            "time"
        )

        type Seat int
        type Bar chan Seat

        func (bar Bar) ServeConsumer(customerId int) {
            log.Print("-> consumer#", customerId, " enters the bar")
            seat := <-bar // need a seat to drink
            log.Print("consumer#", customerId, " drinks at seat#", seat)
            time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
            log.Print("<- consumer#", customerId, " frees seat#", seat)
            bar <- seat // free the seat and leave the bar
        }

        func main() {
            rand.Seed(time.Now().UnixNano())

            bar24x7 := make(Bar, 10// the bar has 10 seats
            // Place seats in an bar.
            for seatId := 0; seatId < cap(bar24x7); seatId++ {
                bar24x7 <- Seat(seatId) // none of the sends will block
            }

            // a new consumer try to enter the bar for each second
            for customerId := 0; ; customerId++ {
                time.Sleep(time.Second)
                go bar24x7.ServeConsumer(customerId)
            }
        }

        互斥量

        互斥量相當(dāng)于二元信號里,所以 cap 為 1 的 channel 可以當(dāng)成互斥量使用:

        package main

        import "fmt"

        func main() {
            mutex := make(chan struct{}, 1// the capacity must be one

            counter := 0
            increase := func() {
                mutex <- struct{}{} // lock
                counter++
                <-mutex // unlock
            }

            increase1000 := func(done chan<- struct{}) {
                for i := 0; i < 1000; i++ {
                    increase()
                }
                done <- struct{}{}
            }

            done := make(chan struct{})
            go increase1000(done)
            <-done; <-done
            fmt.Println(counter) // 2000
        }

        關(guān)閉 channel

        關(guān)閉不再需要使用的 channel 并不是必須的。跟其他資源比如打開的文件、socket 連接不一樣,這類資源使用完后不關(guān)閉后會造成句柄泄露,channel 使用完后不關(guān)閉也沒有關(guān)系,channel 沒有被任何協(xié)程用到后最終會被 GC 回收。關(guān)閉 channel 一般是用來通知其他協(xié)程某個(gè)任務(wù)已經(jīng)完成了。golang 也沒有直接提供判斷 channel 是否已經(jīng)關(guān)閉的接口,雖然可以用其他不太優(yōu)雅的方式自己實(shí)現(xiàn)一個(gè):
        func isClosed(ch chan int) bool {
            select {
            case <-ch:
                return true
            default:
            }
            return false
        }
        不過實(shí)現(xiàn)一個(gè)這樣的接口也沒什么必要。因?yàn)榫退阃ㄟ^ isClosed() 得到當(dāng)前 channel 當(dāng)前還未關(guān)閉,如果試圖往 channel 里寫數(shù)據(jù),仍然可能會發(fā)生 panic ,因?yàn)樵谡{(diào)用 isClosed() 后,其他協(xié)程可能已經(jīng)把 channel 關(guān)閉了。關(guān)閉 channel 時(shí)應(yīng)該注意以下準(zhǔn)則:
        • 不要在讀取端關(guān)閉 channel ,因?yàn)閷懭攵藷o法知道 channel 是否已經(jīng)關(guān)閉,往已關(guān)閉的 channel 寫數(shù)據(jù)會 panic ;
        • 有多個(gè)寫入端時(shí),不要再寫入端關(guān)閉 channle ,因?yàn)槠渌麑懭攵藷o法知道 channel 是否已經(jīng)關(guān)閉,關(guān)閉已經(jīng)關(guān)閉的 channel 會發(fā)生 panic ;
        • 如果只有一個(gè)寫入端,可以在這個(gè)寫入端放心關(guān)閉 channel 。
        關(guān)閉 channel 粗暴一點(diǎn)的做法是隨意關(guān)閉,如果產(chǎn)生了 panic 就用 recover 避免進(jìn)程掛掉。稍好一點(diǎn)的方案是使用標(biāo)準(zhǔn)庫的 sync 包來做關(guān)閉 channel 時(shí)的協(xié)程同步,不過使用起來也稍微復(fù)雜些。下面介紹一種優(yōu)雅些的做法。

        一寫多讀

        這種場景下這個(gè)唯一的寫入端可以關(guān)閉 channel 用來通知讀取端所有數(shù)據(jù)都已經(jīng)寫入完成了。讀取端只需要用 for range 把 channel 中數(shù)據(jù)遍歷完就可以了,當(dāng) channel 關(guān)閉時(shí),for range 仍然會將 channel 緩沖中的數(shù)據(jù)全部遍歷完然后再退出循環(huán):
        package main

        import (
            "fmt"
            "sync"
        )

        func main() {
            wg := &sync.WaitGroup{}
            ch := make(chan int100)

            send := func() {
                for i := 0; i < 100; i++ {
                    ch <- i
                }
                // signal sending finish
                close(ch)
            }

            recv := func(id int) {
                defer wg.Done()
                for i := range ch {
                    fmt.Printf("receiver #%d get %d\n", id, i)
                }
                fmt.Printf("receiver #%d exit\n", id)
            }

            wg.Add(3)
            go recv(0)
            go recv(1)
            go recv(2)
            send()

            wg.Wait()
        }

        多寫一讀

        這種場景下雖然可以用 sync.Once 來解決多個(gè)寫入端重復(fù)關(guān)閉 channel 的問題,但更優(yōu)雅的辦法設(shè)置一個(gè)額外的 channel ,由讀取端通過關(guān)閉來通知寫入端任務(wù)完成不要再繼續(xù)再寫入數(shù)據(jù)了:
        package main

        import (
            "fmt"
            "sync"
        )

        func main() {
            wg := &sync.WaitGroup{}
            ch := make(chan int100)
            done := make(chan struct{})

            send := func(id int) {
                defer wg.Done()
                for i := 0; ; i++ {
                    select {
                    case <-done:
                        // get exit signal
                        fmt.Printf("sender #%d exit\n", id)
                        return
                    case ch <- id*1000 + i:
                    }
                }
            }

            recv := func() {
                count := 0
                for i := range ch {
                    fmt.Printf("receiver get %d\n", i)
                    count++
                    if count >= 1000 {
                        // signal recving finish
                        close(done)
                        return
                    }
                }
            }

            wg.Add(3)
            go send(0)
            go send(1)
            go send(2)
            recv()

            wg.Wait()
        }

        多寫多讀

        這種場景稍微復(fù)雜,和上面的例子一樣,也需要設(shè)置一個(gè)額外 channel 用來通知多個(gè)寫入端和讀取端。另外需要起一個(gè)額外的協(xié)程來通過關(guān)閉這個(gè) channel 來廣播通知:
        package main

        import (
            "fmt"
            "sync"
            "time"
        )

        func main() {
            wg := &sync.WaitGroup{}
            ch := make(chan int100)
            done := make(chan struct{})

            send := func(id int) {
                defer wg.Done()
                for i := 0; ; i++ {
                    select {
                    case <-done:
                        // get exit signal
                        fmt.Printf("sender #%d exit\n", id)
                        return
                    case ch <- id*1000 + i:
                    }
                }
            }

            recv := func(id int) {
                defer wg.Done()
                for {
                    select {
                    case <-done:
                        // get exit signal
                        fmt.Printf("receiver #%d exit\n", id)
                        return
                    case i := <-ch:
                        fmt.Printf("receiver #%d get %d\n", id, i)
                        time.Sleep(time.Millisecond)
                    }
                }
            }

            wg.Add(6)
            go send(0)
            go send(1)
            go send(2)
            go recv(0)
            go recv(1)
            go recv(2)

            time.Sleep(time.Second)
            // signal finish
            close(done)
            // wait all sender and receiver exit
            wg.Wait()
        }

        總結(jié)

        channle 作為 golang 最重要的特性,用起來還是比較爽的。傳統(tǒng)的 C 里要實(shí)現(xiàn)類型的功能的話,一般需要用到 socket 或者 FIFO 來實(shí)現(xiàn),另外還要考慮數(shù)據(jù)包的完整性與并發(fā)沖突的問題,channel 則屏蔽了這些底層細(xì)節(jié),使用者只需要考慮讀寫就可以了。channel 是引用類型,了解一下 channel 底層的機(jī)制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當(dāng)可能會造成死鎖或者無限制的協(xié)程創(chuàng)建最終導(dǎo)致進(jìn)程掛掉。
        channel 除在可以用來在協(xié)程之間通信外,其阻塞和喚醒協(xié)程的特性也可以用作協(xié)程之間的同步機(jī)制,文中也用示例簡單介紹了這種場景下的用法。
        關(guān)閉 channel 并不是必須的,只要沒有協(xié)程沒用引用 channel ,最終會被 GC 清理。所以使用的時(shí)候要特別注意,不要讓協(xié)程阻塞在 channel 上,這種情況很難檢測到,而且會造成 channel 和阻塞在 channel 的協(xié)程占有的資源無法被 GC 清理最終導(dǎo)致內(nèi)存泄露。
        channle 方便 golang 程序使用 CSP 的編程范形,但是 golang 是一種多范形的編程語言,golang 也支持傳統(tǒng)的通過共享內(nèi)存來通信的編程方式。終極的原則是根據(jù)場景選擇合適的編程范型,不要因?yàn)?channel 好用而濫用 CSP 。

        轉(zhuǎn)自:ExplorerMan

        cnblogs.com/ExMan/p/11710017.html

        文章轉(zhuǎn)載:Go開發(fā)大全

        (版權(quán)歸原作者所有,侵刪)


        點(diǎn)擊下方“閱讀原文”查看更多

        瀏覽 46
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日韩在线国产 | 婷婷五月天丁香社 | 精品人妻无码一区二区三区91麻豆 | 男女视频免费观看 | 国产精品传媒在线 | 少妇娇妻邻居少妇水多 | 全部免费毛片免费播放视频 | 91久久爽无码人妻AⅤ精品牛牛 | 影音先锋国产精品 | 爱爱一级视频 |