golang channel 使用總結(jié)


Do not communicate by sharing memory; instead, share memory by communicating.
| 操作 | nil channel | closed channel | not-closed non-nil channel |
|---|---|---|---|
| close | panic | panic | 成功 close |
寫 ch <- | 一直阻塞 | panic | 阻塞或成功寫入數(shù)據(jù) |
讀 <- ch | 一直阻塞 | 讀取對應(yīng)類型零值 | 阻塞或成功讀取數(shù)據(jù) |
// ok is false when ch is closed
v, ok := <-ch
內(nèi)部實(shí)現(xiàn)
$GOROOT/src/runtime/chan.go 里),維護(hù)了 3 個(gè)隊(duì)列:讀等待協(xié)程隊(duì)列 recvq,維護(hù)了阻塞在讀此 channel 的協(xié)程列表 寫等待協(xié)程隊(duì)列 sendq,維護(hù)了阻塞在寫此 channel 的協(xié)程列表 緩沖數(shù)據(jù)隊(duì)列 buf,用環(huán)形隊(duì)列實(shí)現(xiàn),不帶緩沖的 channel 此隊(duì)列 size 則為 0

當(dāng) buf 非空時(shí),此時(shí) recvq 必為空,buf 彈出一個(gè)元素給讀協(xié)程,讀協(xié)程獲得數(shù)據(jù)后繼續(xù)執(zhí)行,此時(shí)若 sendq 非空,則從 sendq 中彈出一個(gè)寫協(xié)程轉(zhuǎn)入 running 狀態(tài),待寫數(shù)據(jù)入隊(duì)列 buf ,此時(shí)讀取操作 <- ch未阻塞;當(dāng) buf 為空但 sendq 非空時(shí)(不帶緩沖的 channel),則從 sendq 中彈出一個(gè)寫協(xié)程轉(zhuǎn)入 running 狀態(tài),待寫數(shù)據(jù)直接傳遞給讀協(xié)程,讀協(xié)程繼續(xù)執(zhí)行,此時(shí)讀取操作 <- ch未阻塞;當(dāng) buf 為空并且 sendq 也為空時(shí),讀協(xié)程入隊(duì)列 recvq 并轉(zhuǎn)入 blocking 狀態(tài),當(dāng)后續(xù)有其他協(xié)程往 channel 寫數(shù)據(jù)時(shí),讀協(xié)程才會重新轉(zhuǎn)入 running 狀態(tài),此時(shí)讀取操作 <- ch阻塞。
當(dāng)隊(duì)列 recvq 非空時(shí),此時(shí)隊(duì)列 buf 必為空,從 recvq 彈出一個(gè)讀協(xié)程接收待寫數(shù)據(jù),此讀協(xié)程此時(shí)結(jié)束阻塞并轉(zhuǎn)入 running 狀態(tài),寫協(xié)程繼續(xù)執(zhí)行,此時(shí)寫入操作 ch <-未阻塞;當(dāng)隊(duì)列 recvq 為空但 buf 未滿時(shí),此時(shí) sendq 必為空,寫協(xié)程的待寫數(shù)據(jù)入 buf 然后繼續(xù)執(zhí)行,此時(shí)寫入操作 ch <-未阻塞;當(dāng)隊(duì)列 recvq 為空并且 buf 為滿時(shí),此時(shí)寫協(xié)程入隊(duì)列 sendq 并轉(zhuǎn)入 blokcing 狀態(tài),當(dāng)后續(xù)有其他協(xié)程從 channel 中讀數(shù)據(jù)時(shí),寫協(xié)程才會重新轉(zhuǎn)入 running 狀態(tài),此時(shí)寫入操作 ch <-阻塞。
當(dāng)隊(duì)列 recvq 非空時(shí),此時(shí) buf 必為空,recvq 中的所有協(xié)程都將收到對應(yīng)類型的零值然后結(jié)束阻塞狀態(tài); 當(dāng)隊(duì)列 sendq 非空時(shí),此時(shí) buf 必為滿,sendq 中的所有協(xié)程都會產(chǎn)生 panic ,在 buf 中數(shù)據(jù)仍然會保留直到被其他協(xié)程讀取。
使用場景
futures / promises
package main
import (
"io/ioutil"
"log"
"net/http"
)
// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
c := make(chan []byte, 1)
go func() {
var body []byte
defer func() {
c <- body
}()
res, err := http.Get(url)
if err != nil {
return
}
defer res.Body.Close()
body, _ = ioutil.ReadAll(res.Body)
}()
return c
}
func main() {
future := RequestFuture("https://api.github.com/users/octocat/orgs")
body := <-future
log.Printf("reponse length: %d", len(body))
}
條件變量 (condition variable)
strct {} 作為 channel 的類型。一對一通知
pthread_cond_signal() 的功能,用來在一個(gè)協(xié)程中通知另個(gè)某一個(gè)協(xié)程事件發(fā)生:package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
nums := make([]int, 100)
go func() {
time.Sleep(time.Second)
for i := 0; i < len(nums); i++ {
nums[i] = i
}
// send a finish signal
ch <- struct{}{}
}()
// wait for finish signal
<-ch
fmt.Println(nums)
}
廣播通知
pthread_cond_broadcast() 的功能。利用從已關(guān)閉的 channel 讀取數(shù)據(jù)時(shí)總是非阻塞的特性,可以實(shí)現(xiàn)在一個(gè)協(xié)程中向其他多個(gè)協(xié)程廣播某個(gè)事件發(fā)生的通知:package main
import (
"fmt"
"time"
)
func main() {
N := 10
exit := make(chan struct{})
done := make(chan struct{}, N)
// start N worker goroutines
for i := 0; i < N; i++ {
go func(n int) {
for {
select {
// wait for exit signal
case <-exit:
fmt.Printf("worker goroutine #%d exit\n", n)
done <- struct{}{}
return
case <-time.After(time.Second):
fmt.Printf("worker goroutine #%d is working...\n", n)
}
}
}(i)
}
time.Sleep(3 * time.Second)
// broadcast exit signal
close(exit)
// wait for all worker goroutines exit
for i := 0; i < N; i++ {
<-done
}
fmt.Println("main goroutine exit")
}
信號量
package main
import (
"log"
"math/rand"
"time"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeConsumer(customerId int) {
log.Print("-> consumer#", customerId, " enters the bar")
seat := <-bar // need a seat to drink
log.Print("consumer#", customerId, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
log.Print("<- consumer#", customerId, " frees seat#", seat)
bar <- seat // free the seat and leave the bar
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10) // the bar has 10 seats
// Place seats in an bar.
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId) // none of the sends will block
}
// a new consumer try to enter the bar for each second
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeConsumer(customerId)
}
}
互斥量
互斥量相當(dāng)于二元信號里,所以 cap 為 1 的 channel 可以當(dāng)成互斥量使用:
package main
import "fmt"
func main() {
mutex := make(chan struct{}, 1) // the capacity must be one
counter := 0
increase := func() {
mutex <- struct{}{} // lock
counter++
<-mutex // unlock
}
increase1000 := func(done chan<- struct{}) {
for i := 0; i < 1000; i++ {
increase()
}
done <- struct{}{}
}
done := make(chan struct{})
go increase1000(done)
<-done; <-done
fmt.Println(counter) // 2000
}
關(guān)閉 channel
func isClosed(ch chan int) bool {
select {
case <-ch:
return true
default:
}
return false
}
isClosed() 得到當(dāng)前 channel 當(dāng)前還未關(guān)閉,如果試圖往 channel 里寫數(shù)據(jù),仍然可能會發(fā)生 panic ,因?yàn)樵谡{(diào)用 isClosed() 后,其他協(xié)程可能已經(jīng)把 channel 關(guān)閉了。關(guān)閉 channel 時(shí)應(yīng)該注意以下準(zhǔn)則:不要在讀取端關(guān)閉 channel ,因?yàn)閷懭攵藷o法知道 channel 是否已經(jīng)關(guān)閉,往已關(guān)閉的 channel 寫數(shù)據(jù)會 panic ; 有多個(gè)寫入端時(shí),不要再寫入端關(guān)閉 channle ,因?yàn)槠渌麑懭攵藷o法知道 channel 是否已經(jīng)關(guān)閉,關(guān)閉已經(jīng)關(guān)閉的 channel 會發(fā)生 panic ; 如果只有一個(gè)寫入端,可以在這個(gè)寫入端放心關(guān)閉 channel 。
sync 包來做關(guān)閉 channel 時(shí)的協(xié)程同步,不過使用起來也稍微復(fù)雜些。下面介紹一種優(yōu)雅些的做法。一寫多讀
for range 把 channel 中數(shù)據(jù)遍歷完就可以了,當(dāng) channel 關(guān)閉時(shí),for range 仍然會將 channel 緩沖中的數(shù)據(jù)全部遍歷完然后再退出循環(huán):package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
send := func() {
for i := 0; i < 100; i++ {
ch <- i
}
// signal sending finish
close(ch)
}
recv := func(id int) {
defer wg.Done()
for i := range ch {
fmt.Printf("receiver #%d get %d\n", id, i)
}
fmt.Printf("receiver #%d exit\n", id)
}
wg.Add(3)
go recv(0)
go recv(1)
go recv(2)
send()
wg.Wait()
}
多寫一讀
sync.Once 來解決多個(gè)寫入端重復(fù)關(guān)閉 channel 的問題,但更優(yōu)雅的辦法設(shè)置一個(gè)額外的 channel ,由讀取端通過關(guān)閉來通知寫入端任務(wù)完成不要再繼續(xù)再寫入數(shù)據(jù)了:package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func() {
count := 0
for i := range ch {
fmt.Printf("receiver get %d\n", i)
count++
if count >= 1000 {
// signal recving finish
close(done)
return
}
}
}
wg.Add(3)
go send(0)
go send(1)
go send(2)
recv()
wg.Wait()
}
多寫多讀
package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int, 100)
done := make(chan struct{})
send := func(id int) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-done:
// get exit signal
fmt.Printf("sender #%d exit\n", id)
return
case ch <- id*1000 + i:
}
}
}
recv := func(id int) {
defer wg.Done()
for {
select {
case <-done:
// get exit signal
fmt.Printf("receiver #%d exit\n", id)
return
case i := <-ch:
fmt.Printf("receiver #%d get %d\n", id, i)
time.Sleep(time.Millisecond)
}
}
}
wg.Add(6)
go send(0)
go send(1)
go send(2)
go recv(0)
go recv(1)
go recv(2)
time.Sleep(time.Second)
// signal finish
close(done)
// wait all sender and receiver exit
wg.Wait()
}
總結(jié)
轉(zhuǎn)自:ExplorerMan
cnblogs.com/ExMan/p/11710017.html
文章轉(zhuǎn)載:Go開發(fā)大全
(版權(quán)歸原作者所有,侵刪)
![]()

點(diǎn)擊下方“閱讀原文”查看更多
評論
圖片
表情
