1. 你真的了解 sync.Mutex嗎

        共 6846字,需瀏覽 14分鐘

         ·

        2021-02-20 15:11

        Mutex是一個互斥的排他鎖,零值Mutex為未上鎖狀態(tài),Mutex一旦被使用 禁止被拷貝。使用起來也比較簡單

        package?main

        import?"sync"

        func?main()?{
        ?m?:=?sync.Mutex{}
        ?m.Lock()
        ?defer?m.Unlock()
        ??//?do?something
        }

        Mutex有兩種操作模式:

        • 正常模式(非公平模式)

        阻塞等待的goroutine保存在FIFO的隊列中,喚醒的goroutine不直接擁有鎖,需要與新來的goroutine競爭獲取鎖。因為新來的goroutine很多已經(jīng)占有了CPU,所以喚醒的goroutine在競爭中很容易輸;但如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為饑餓模式。

        • 饑餓模式(公平模式)

        這種模式下,直接將等待隊列隊頭goroutine解鎖goroutine;新來的gorountine也不會嘗試獲得鎖,而是直接插入到等待隊列隊尾。

        mutex mode

        如果一個goroutine獲得了鎖,并且他在等待隊列隊尾 或者 他等待小于1ms,則會將Mutex的模式切換回正常模式。正常模式有更好的性能,新來的goroutine通過幾次競爭可以直接獲取到鎖,盡管當前仍有等待的goroutine。而饑餓模式則是對正常模式的補充,防止等待隊列中的goroutine永遠沒有機會獲取鎖。

        其數(shù)據(jù)結構為:

        type?Mutex?struct?{
        ?state?int32?//?鎖競爭的狀態(tài)值
        ?sema??uint32?//?信號量
        }

        state代表了當前鎖的狀態(tài)、 是否是存在自旋、是否是饑餓模式、阻塞goroutine數(shù)量

        ?mutexLocked?=?1?<iota?//?mutex?is?locked
        ?mutexWoken
        ?mutexStarving
        ?mutexWaiterShift?=?iota

        mutex state

        mutex.state & mutexLocked 加鎖狀態(tài) 1 表示已加鎖 0 表示未加鎖

        mutex.state & mutexWoken ?喚醒狀態(tài) 1 表示已喚醒狀態(tài) 0 表示未喚醒

        mutex.state & mutexStarving ?饑餓狀態(tài) ?1 表示饑餓狀態(tài)?0表示正常狀態(tài)

        mutex.state >> mutexWaiterShift得到當前goroutine數(shù)目

        Lock

        上鎖大致分為fast-pathslow-path

        Fast-path

        lock通過調(diào)用atomic.CompareAndSwapInt32來競爭更新m.state,成功則獲得鎖;失敗,則進入slow-path

        func?(m?*Mutex)?Lock()?{
        ?//?Fast?path:?grab?unlocked?mutex.
        ?if?atomic.CompareAndSwapInt32(&m.state,?0,?mutexLocked)?{
        ??if?race.Enabled?{
        ???race.Acquire(unsafe.Pointer(m))
        ??}
        ??return
        ?}
        ?//?Slow?path?(outlined?so?that?the?fast?path?can?be?inlined)
        ?m.lockSlow()
        }

        atomic.CompareAndSwapInt32正如簽名一樣,進行比較交換操作,這過程是原子的

        //?CompareAndSwapInt32?executes?the?compare-and-swap?operation?for?an?int32?value.
        func?CompareAndSwapInt32(addr?*int32,?old,?new?int32)?(swapped?bool)

        源碼中我們并不能看到該函數(shù)的具體實現(xiàn),他的實現(xiàn)跟硬件平臺有關,我們可以查看匯編代碼一窺究竟,go tool compile -S mutex.go也可以對二進制文件go tool objdump -s methodname binary

        	0x0036 00054 (loop.go:6)	MOVQ	AX, CX
        0x0039 00057 ($GOROOT/src/sync/mutex.go:74) XORL AX, AX
        0x003b 00059 ($GOROOT/src/sync/mutex.go:74) MOVL $1, DX
        0x0040 00064 ($GOROOT/src/sync/mutex.go:74) LOCK
        0x0041 00065 ($GOROOT/src/sync/mutex.go:74) CMPXCHGL DX, (CX)
        0x0044 00068 ($GOROOT/src/sync/mutex.go:74) SETEQ AL
        0x0047 00071 ($GOROOT/src/sync/mutex.go:74) TESTB AL, AL
        0x0049 00073 ($GOROOT/src/sync/mutex.go:74) JEQ 150
        0x004b 00075 (loop.go:8) MOVL $8, ""..autotmp_6+16(SP)
        0x0053 00083 (loop.go:8) LEAQ sync.(*Mutex).Unlock·f(SB), AX

        重點關注第5行CMPXCHGL DX, (CX)這個CMPXCHGL是x86和Intel架構中的compare and exchange指令,Java的那套AtomicXX底層也是依賴這個指令來保證原子性操作的。

        所以我們看到Mutex是互斥排他鎖且不可重入,當我們在一個goroutine獲取同一個鎖會導致死鎖。

        package?main

        import?"sync"

        func?main()?{
        ?m?:=?sync.Mutex{}
        ?m.Lock()
        ??//這里會導致死鎖
        ?m.Lock()
        ?defer?m.Unlock()
        }

        slow-path

        如果goroutinefast-path失敗,則調(diào)用m.lockSlow()進入slow-path,函數(shù)內(nèi)部主要是一個for{}死循環(huán),進入循環(huán)的goroutine大致分為兩類:

        • 新來的gorountine
        • 被喚醒的goroutine

        Mutex默認為正常模式,若新來的goroutine搶占成功,則另一個就需要阻塞等待;阻塞等待一旦超過閾值1ms則會將Mutex切換到饑餓模式,這個模式下新來的goroutine只能阻塞等待在隊列尾部,沒有搶占的資格。當然等待阻塞->喚醒->參與搶占鎖,這個過程顯示不是很高效,所以這里有一個自旋的優(yōu)化

        當mutex處于正常模式且能夠自旋,會讓當前goroutine自旋等待,同時設置mutex.state的mutexWoken位為1,保證自旋等待的goroutine一定比新來goroutine更有優(yōu)先權。這樣unlock操作也會優(yōu)先保證自旋等待的goroutine獲取鎖

        golang對自旋做了些限制要求 需要:

        • 多核CPU
        • GOMAXPROCS>1
        • 至少有一個運行的P并且local的P隊列為空

        感興趣的可以跟下源碼比較簡單

        func?(m?*Mutex)?lockSlow()?{
        ?var?waitStartTime?int64
        ?starving?:=?false
        ?awoke?:=?false
        ?iter?:=?0
        ?old?:=?m.state
        ?for?{
        ????//饑餓模式下不能自旋,也沒有資格搶占,鎖是手遞手給到等待的goroutine
        ??if?old&(mutexLocked|mutexStarving)?==?mutexLocked?&&?runtime_canSpin(iter)?{//當Mutex處于正常模式且能夠自旋
        ??????//設置mutexWoken為1?告訴unlock操作,存在自旋gorountine?unlock后不需要喚醒其他goroutine
        ???if?!awoke?&&?old&mutexWoken?==?0?&&?old>>mutexWaiterShift?!=?0?&&
        ????atomic.CompareAndSwapInt32(&m.state,?old,?old|mutexWoken)?{
        ????awoke?=?true
        ???}
        ???runtime_doSpin()
        ???iter++
        ???old?=?m.state
        ???continue
        ??}
        ??//??自旋完了?還是沒拿到鎖
        ??new?:=?old
        ????//當mutex處于正常模式,將new的mutexLocked設置為1?即準備搶占鎖
        ??if?old&mutexStarving?==?0?{
        ???new?|=?mutexLocked
        ??}
        ????//加鎖狀態(tài)或饑餓模式下?新來的goroutine進入等待隊列
        ??if?old&(mutexLocked|mutexStarving)?!=?0?{
        ???new?+=?1?<??}

        ????//將Mutex切換為饑餓模式,若未加鎖則不必切換
        ????//Unlock操作希望饑餓模式存在等待者
        ??if?starving?&&?old&mutexLocked?!=?0?{
        ???new?|=?mutexStarving
        ??}
        ??if?awoke?{
        ??????//?當前goroutine自旋過?已被被喚醒,則需要將mutexWoken重置
        ???if?new&mutexWoken?==?0?{
        ????throw("sync:?inconsistent?mutex?state")
        ???}
        ???new?&^=?mutexWoken?//重置mutexWoken
        ??}
        ??if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
        ??????//?當前goroutine獲取鎖前mutex處于未加鎖?正常模式下
        ???if?old&(mutexLocked|mutexStarving)?==?0?{
        ????break?//?使用CAS成功搶占到鎖
        ???}
        ???//?waitStartTime!=0表示當前goroutine是等待狀態(tài)喚醒的?
        ??????//?為了與第一次調(diào)用Lock的goroutine劃分不同的優(yōu)先級
        ???queueLifo?:=?waitStartTime?!=?0
        ???if?waitStartTime?==?0?{
        ????????//開始記錄等待時間
        ????waitStartTime?=?runtime_nanotime()
        ???}
        ??????//?將被喚醒但是沒有獲得鎖的goroutine插入到當前等待隊列隊首
        ??????//?使用信號量阻塞當前goroutine
        ???runtime_SemacquireMutex(&m.sema,?queueLifo,?1)
        ??????//?當goroutine等待時間超過starvationThresholdNs,mutex進入饑餓模式
        ???starving?=?starving?||?runtime_nanotime()-waitStartTime?>?starvationThresholdNs
        ???old?=?m.state
        ???if?old&mutexStarving?!=?0?{
        ????????//如果當前goroutine被喚醒且mutex處于饑餓模式?則將鎖手遞手交給當前goroutine
        ????if?old&(mutexLocked|mutexWoken)?!=?0?||?old>>mutexWaiterShift?==?0?{
        ?????throw("sync:?inconsistent?mutex?state")
        ????}
        ????????//等待狀態(tài)的goroutine?-?1
        ????delta?:=?int32(mutexLocked?-?1<????????//如果等待時間小于1ms?或?當前goroutine是隊列中最后一個
        ????if?!starving?||?old>>mutexWaiterShift?==?1?{
        ??????//?退出饑餓模式
        ?????delta?-=?mutexStarving
        ????}
        ????atomic.AddInt32(&m.state,?delta)
        ????break
        ???}
        ???awoke?=?true
        ???iter?=?0
        ??}?else?{
        ???old?=?m.state
        ??}
        ?}
        }

        Unlock

        解鎖分兩種情況

        1. 當前只有一個goroutine占有鎖 unlock完 直接結束
        func?(m?*Mutex)?Unlock()?{

        ?//?去除加鎖狀態(tài)
        ?new?:=?atomic.AddInt32(&m.state,?-mutexLocked)
        ?if?new?!=?0?{//存在等待的goroutine
        ??m.unlockSlow(new)
        ?}
        }
        1. unlock完畢mutex.state!=0 則存在以下可能
          • 直接將鎖交給等待隊列的第一個goroutine
          • 當前存在等待goroutine 然后喚醒它 但不是第一個goroutine
          • 當前存在自旋等待的goroutine 則不喚醒其他等待gorotune
          • 正常模式下
          • 饑餓模式下
        func?(m?*Mutex)?unlockSlow(new?int32)?{
        ??//未加鎖的情況下不能多次調(diào)用unlock
        ?if?(new+mutexLocked)&mutexLocked?==?0?{
        ??throw("sync:?unlock?of?unlocked?mutex")
        ?}
        ?if?new&mutexStarving?==?0?{//正常模式下
        ??old?:=?new
        ??for?{
        ??????//沒有等待的goroutine?或?已經(jīng)存在一個獲得鎖?或被喚醒?或處于饑餓模式下不需要喚醒任何處于等待的goroutine
        ???if?old>>mutexWaiterShift?==?0?||?old&(mutexLocked|mutexWoken|mutexStarving)?!=?0?{
        ????return
        ???}
        ???//?等待狀態(tài)goroutine數(shù)量-1?并設置喚醒狀態(tài)為1?然后喚醒一個等待goroutine
        ???new?=?(old?-?1<???if?atomic.CompareAndSwapInt32(&m.state,?old,?new)?{
        ????????//喚醒一個阻塞的goroutine?但不是第一個等待者
        ????runtime_Semrelease(&m.sema,?false,?1)
        ????return
        ???}
        ???old?=?m.state
        ??}
        ?}?else?{
        ????//饑餓模式下手遞手將鎖交給隊列第一個等待的goroutine
        ????//即使期間有新來的goroutine到來,只要處于饑餓模式?鎖就不會被新來的goroutine搶占
        ??runtime_Semrelease(&m.sema,?true,?1)
        ?}
        }

        信號量

        上面可以看到Mutexgoroutine的阻塞和喚醒操作是利用semaphore來實現(xiàn)的,大致的思路是:Go runtime維護了一個全局的變量semtable,它保持了所有的信號量

        //?Prime?to?not?correlate?with?any?user?patterns.
        const?semTabSize?=?251

        var?semtable?[semTabSize]struct?{
        ?root?semaRoot
        ?pad??[cpu.CacheLinePadSize?-?unsafe.Sizeof(semaRoot{})]byte
        }

        每個信號量都由一個變量地址指定,Mutex的栗子里就是mutex.sema的地址

        type?semaRoot?struct?{
        ?lock??mutex
        ?treap?*sudog?//?root?of?balanced?tree?of?unique?waiters.
        ?nwait?uint32?//?Number?of?waiters.?Read?w/o?the?lock.
        }

        大致畫了下其數(shù)據(jù)結構

        semtable
        1. goroutine未獲取到鎖,需要阻塞時調(diào)用sync.runtime_SemacquireMutex 進入阻塞邏輯
        //go:linkname?sync_runtime_SemacquireMutex?sync.runtime_SemacquireMutex
        func?sync_runtime_SemacquireMutex(addr?*uint32,?lifo?bool,?skipframes?int)?{
        ?semacquire1(addr,?lifo,?semaBlockProfile|semaMutexProfile,?skipframes)
        }

        func?semacquire1(addr?*uint32,?lifo?bool,?profile?semaProfileFlags,?skipframes?int)?{
        ?gp?:=?getg()
        ?if?gp?!=?gp.m.curg?{
        ??throw("semacquire?not?on?the?G?stack")
        ?}

        ?//?低成本case
        ??//?若addr大于1?并通過CAS?-1?成功,則獲取信號量成功?不需要阻塞
        ?if?cansemacquire(addr)?{
        ??return
        ?}

        ?//?復雜?case:
        ?//?增加等待goroutine數(shù)量
        ?//?再次嘗試cansemacquire?成功則返回
        ?//?失敗則將自己作為一個waiter入隊
        ?//?sleep
        ?//?(waiter?descriptor?is?dequeued?by?signaler)
        ?s?:=?acquireSudog()
        ?root?:=?semroot(addr)
        ?t0?:=?int64(0)
        ?s.releasetime?=?0
        ?s.acquiretime?=?0
        ?s.ticket?=?0
        ?if?profile&semaBlockProfile?!=?0?&&?blockprofilerate?>?0?{
        ??t0?=?cputicks()
        ??s.releasetime?=?-1
        ?}
        ?if?profile&semaMutexProfile?!=?0?&&?mutexprofilerate?>?0?{
        ??if?t0?==?0?{
        ???t0?=?cputicks()
        ??}
        ??s.acquiretime?=?t0
        ?}
        ?for?{
        ??lock(&root.lock)
        ??//?給nwait+1?這樣semrelease中不會進低成本路徑了
        ??atomic.Xadd(&root.nwait,?1)
        ??//?檢查?cansemacquire?避免錯過喚醒
        ??if?cansemacquire(addr)?{
        ???atomic.Xadd(&root.nwait,?-1)
        ???unlock(&root.lock)
        ???break
        ??}
        ????//cansemacquire之后的semrelease都可以知道我們正在等待
        ????//上面設置了nwait,所以會直接進入sleep?即goparkunlock
        ??root.queue(addr,?s,?lifo)
        ??goparkunlock(&root.lock,?waitReasonSemacquire,?traceEvGoBlockSync,?4+skipframes)
        ??if?s.ticket?!=?0?||?cansemacquire(addr)?{
        ???break
        ??}
        ?}
        ?if?s.releasetime?>?0?{
        ??blockevent(s.releasetime-t0,?3+skipframes)
        ?}
        ?releaseSudog(s)
        }

        如果addr大于1并通過CAS-1成功則獲取信號量成功,直接返回

        否則通過對信號量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root拿到semaRoot(這里個3和251 沒有明白為什么是這兩個數(shù)???),semaRoot包含了一個sudog鏈表和一個nwait整型字段。nwait表示該信號量上阻塞等待的g的數(shù)量,同時為了保證線程安全需要一個互斥量來保護鏈表。

        這里需要注意的是 此處的runtime.mutex并不是之前所說的sync.Mutex,是內(nèi)部的一個簡單版本

        簡單來說,sync_runtime_Semacquire就是wait知道*s>0 然后原子的遞減它,來完成同步過程中簡單的睡眠原語

        1. goroutine要釋放鎖 喚醒等待的g時調(diào)用sync.runtime_Semrelease
        //go:linkname?sync_runtime_Semrelease?sync.runtime_Semrelease
        func?sync_runtime_Semrelease(addr?*uint32,?handoff?bool,?skipframes?int)?{
        ?semrelease1(addr,?handoff,?skipframes)
        }

        func?semrelease1(addr?*uint32,?handoff?bool,?skipframes?int)?{
        ?root?:=?semroot(addr)
        ?atomic.Xadd(addr,?1)

        ?//?Easy?case:?no?waiters?
        ?//?這個檢查必須發(fā)生在xadd之后?避免錯過喚醒
        ?//?(see?loop?in?semacquire).
        ?if?atomic.Load(&root.nwait)?==?0?{
        ??return
        ?}

        ?//?Harder?case:?搜索一個waiter?并喚醒它
        ?lock(&root.lock)
        ?if?atomic.Load(&root.nwait)?==?0?{
        ??//?count值已經(jīng)被另一個goroutine消費了
        ??//?所以不需要喚醒其他goroutine
        ??unlock(&root.lock)
        ??return
        ?}
        ?s,?t0?:=?root.dequeue(addr)
        ?if?s?!=?nil?{
        ??atomic.Xadd(&root.nwait,?-1)
        ?}
        ?unlock(&root.lock)
        ?if?s?!=?nil?{?//?May?be?slow,?so?unlock?first
        ??acquiretime?:=?s.acquiretime
        ??if?acquiretime?!=?0?{
        ???mutexevent(t0-acquiretime,?3+skipframes)
        ??}
        ??if?s.ticket?!=?0?{
        ???throw("corrupted?semaphore?ticket")
        ??}
        ??if?handoff?&&?cansemacquire(addr)?{
        ???s.ticket?=?1
        ??}
        ??readyWithTime(s,?5+skipframes)
        ?}
        }

        關于信號量更深層的研究可以看下semaphore in plan9

        總結

        通過看源碼發(fā)現(xiàn)個有意思的問題:如果goroutine g1加的鎖 可以被另一個goroutine g2解鎖,但是等到g1解鎖的時候就會panic



        推薦閱讀


        福利

        我為大家整理了一份從入門到進階的Go學習資料禮包,包含學習建議:入門看什么,進階看什么。關注公眾號 「polarisxu」,回復?ebook?獲取;還可以回復「進群」,和數(shù)萬 Gopher 交流學習。

        瀏覽 104
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 天天射天天操天天日 | 亚洲AV无码专区在线观看播放 | 99re视频精品 | 太涨了好深到宫口h | 少妇淫片免费看大片动漫版app |