淺談Golang兩種線程安全的map
golang map
1. 并發(fā)讀寫測試
在golang中原生map 在并發(fā)場景下,同時讀寫是線程不安全的,無論key是否一樣。以下是測試代碼
package mainimport "time"func main() {testMapReadWriteDiffKey()}func testMapReadWriteDiffKey() {m := make(map[int]int)go func() {for {m[100] = 100}}()go func() {for {_ = m[12]}}()select {}}

相關(guān)文章分析 https://medium.com/a-journey-with-go/go-concurrency-access-with-maps-part-iii-8c0a0e4eb27e
go官方博客 https://go.dev/blog/race-detector
文章分析 https://medium.com/@blanchon.vincent/go-race-detector-with-threadsanitizer-8e497f9e42db
視頻講解
https://www.youtube.com/watch?v=5erqWdlhQLA
2. map+讀寫鎖
在官方庫sync.map沒出來前,Go maps in action推薦的做法是使用map+RWLock,比如定義一個匿名struct變量,其包含map、RWLock,如下所示
var counter = struct{sync.RWMutexm map[string]int}{m: make(map[string]int)}
可以這樣從counter中讀數(shù)據(jù)
counter.RLock()n := counter.m["some_key"]counter.RUnlock()fmt.Println("some_key:", n)
可以這樣往counter中寫數(shù)據(jù)
counter.Lock()counter.m["some_key"]++counter.Unlock()
那Go 1.9版本實現(xiàn)的sync.map和上面的這種實現(xiàn)方式有什么不同?它適用于哪些場景呢?它在哪些方面做了性能優(yōu)化呢?
sync.map
sync.map是用讀寫分離實現(xiàn)的,其思想是空間換時間。和map+RWLock的實現(xiàn)方式相比,它做了一些優(yōu)化:可以無鎖訪問read map,而且會優(yōu)先操作read map,倘若只操作read map就可以滿足要求(增刪改查遍歷),那就不用去操作write map(它的讀寫都要加鎖),所以在某些特定場景中它發(fā)生鎖競爭的頻率會遠遠小于map+RWLock的實現(xiàn)方式。
接下來著重介紹下sync.map源碼,以了解其運作原理。
sync.map源碼 https://github.com/golang/go/blob/master/src/sync/map.go
1. 變量介紹
1.1 結(jié)構(gòu)體Map
type Map struct {?//?互斥鎖mu,操作dirty需先獲取mumu Mutex// read是只讀的數(shù)據(jù)結(jié)構(gòu),訪問它無須加鎖,sync.map的所有操作都優(yōu)先讀read//?read中存儲結(jié)構(gòu)體readOnly,readOnly中存著真實數(shù)據(jù)---entry(詳見1.3),read是dirty的子集//?read中可能會存在臟數(shù)據(jù):即entry被標記為已刪除(詳見1.3)read atomic.Value // readOnly// dirty是可以同時讀寫的數(shù)據(jù)結(jié)構(gòu),訪問它要加鎖,新添加的key都會先放到dirty中// dirty == nil的情況:1.被初始化 2.提升為read后,但它不能一直為nil,否則read和dirty會數(shù)據(jù)不一致。// 當(dāng)有新key來時,會用read中的數(shù)據(jù) (不是read中的全部數(shù)據(jù),而是未被標記為已刪除的數(shù)據(jù),詳見3.2)填充dirty// dirty != nil時它存著sync.map的全部數(shù)據(jù)(包括read中未被標記為已刪除的數(shù)據(jù)和新來的數(shù)據(jù))dirty map[interface{}]*entry// 統(tǒng)計訪問read沒有未命中然后穿透訪問dirty的次數(shù)// 若miss等于dirty的長度,dirty會提升成read,提升后可以增加read的命中率,減少加鎖訪問dirty的次數(shù)misses int}
1.2 結(jié)構(gòu)體readOnly
type readOnly struct {m map[interface{}]*entryamended bool}
1.3 結(jié)構(gòu)體entry
type entry struct {// p == nil:entry已從readOnly中刪除但存在于dirty中// p == expunged:entry已從Map中刪除且不在dirty中// p == 其他值:entry為正常值p unsafe.Pointer // *interface{}}
entry中的指針p指向真正的value所在的地址,dirty和readOnly.m存的值類型就是*entry。這里的nil和expunged有什么作用呢?只要nil不可以嗎?對于這些問題后面會一一解讀。
2. 函數(shù)介紹
2.1 Load方法

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {// 從m.read中換出readOnly,然后從里面找key,這個過程不加鎖read, _ := m.read.Load().(readOnly)e, ok := read.m[key]// readOnly中不存在此key但Map.dirty可能存在if !ok && read.amended {// 加鎖訪問Map.dirtym.mu.Lock()// 雙重檢測:若加鎖前Map.dirty被替換為readonly,則前面m.read.Load().(readOnly)無效,需// 要再次檢查read, _ = m.read.Load().(readOnly)e, ok = read.m[key]// read.m沒有此key && dirty里有可能有(dirty中有read.m沒有的數(shù)據(jù))if !ok && read.amended {// 從dirty中獲取key對應(yīng)的entrye, ok = m.dirty[key]// 無論Map.dirty中是否有這個key,miss都加一,若miss大小等于dirty的長度,dirty中的元素會被// 加到Map.read中m.missLocked()}m.mu.Unlock()}if !ok {return nil, false}// 若entry.p被刪除(等于nil或expunged)返回nil和不存在(false),否則返回對應(yīng)的值和存在(true)return e.load()}
Map.dirty是如何提升為Map.read的呢?讓我們來看下missLocked方法
func (m *Map) missLocked() {??//?訪問一次Map.dirty,misses就要加一m.misses++if m.misses < len(m.dirty) {return}??//?當(dāng)misses等于dirty的長度,m.dirty提升為readOnly,amended被默認賦值成falsem.read.Store(readOnly{m: m.dirty})m.dirty = nilm.misses = 0}
小結(jié):
Load方法會優(yōu)先無鎖訪問readOnly,未命中后如果Map.dirty中可能存在這個數(shù)據(jù)就會加鎖訪問Map.dirty Load方法如果訪問readOnly中不存在但dirty中存在的key,就要加鎖訪問Map.dirty從而帶來額外開銷。
2.2 Store方法

func (m *Map) Store(key, value interface{}) {// 把m.read轉(zhuǎn)成結(jié)構(gòu)體readOnlyread, _ := m.read.Load().(readOnly)// 若key在readOnly.m中且entry.p不為expunged(沒有標記成已刪除)即key同時存在于readOnly.m和dirty// ,用CAS技術(shù)更新value 【注】:e.tryStore在entry.p == expunged時會立刻返回false,否則用CAS// 嘗試更新對應(yīng)的value, 更新成功會返回trueif e, ok := read.m[key]; ok && e.tryStore(&value) {return}// key不存在于readOnly.m或者entry.p==expunged(entry被標記為已刪除),加鎖訪問dirtym.mu.Lock()// 雙重檢測:若加鎖前Map.dirty被提升為readOnly,則前面的read.m[key]可能無效,所以需要再次檢測key是// 否存在于readOnly中read, _ = m.read.Load().(readOnly)// 若key在于readOnly.m中if e, ok := read.m[key]; ok {// entry.p之前的狀態(tài)是expunged,把它置為nilif e.unexpungeLocked() {// 之前dirty中沒有此key,所以往dirty中添加此keym.dirty[key] = e}// 更新(把value的地址原子賦值給指針entry.p)e.storeLocked(&value)// 若key在dirty中} else if e, ok := m.dirty[key]; ok {// 更新(把value的地址原子賦值給指針entry.p)e.storeLocked(&value)// 來了個新key} else {// dirty中沒有新數(shù)據(jù),往dirty中添加第一個新keyif !read.amended {// 把readOnly中未標記為刪除的數(shù)據(jù)拷貝到dirty中m.dirtyLocked()// amended:true,因為現(xiàn)在dirty有readOnly中沒有的keym.read.Store(readOnly{m: read.m, amended: true})}// 把這個新的entry加到dirty中m.dirty[key] = newEntry(value)}m.mu.Unlock()}
函數(shù)tryStore代碼如下:
func (e *entry) tryStore(i *interface{}) bool {for {p := atomic.LoadPointer(&e.p)if p == expunged {return false}if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {return true}}}
func (e *entry) unexpungeLocked() (wasExpunged bool) {return atomic.CompareAndSwapPointer(&e.p, expunged, nil)}func (m *Map) dirtyLocked() {// 只要調(diào)用dirtyLocked,此時dirty肯定等于nilif m.dirty != nil {return}? //?dirty為nil時,把readOnly中沒被標記成刪除的entry添加到dirtyread, _ := m.read.Load().(readOnly)m.dirty = make(map[interface{}]*entry, len(read.m))for k, e := range read.m {????//?tryExpungeLocked函數(shù)在entry未被刪除時【e.p!=expunged&&e.p!=nil】返回false,在????//?e.p==nil時會將其置為expunged并返回trueif !e.tryExpungeLocked() {m.dirty[k] = e // entry沒被刪除,把它添加到dirty中}}}
小結(jié):
Store方法優(yōu)先無鎖訪問readOnly,未命中會加鎖訪問dirty
Store方法中的雙重檢測機制在下面的Load、Delete、Range方法中都會用到,原因是:加鎖前Map.dirty可能已被提升為Map.read,所以加鎖后還要再次檢查key是否存在于Map.read中 dirtyLocked方法在dirty為nil(剛被提升成readOnly或者Map初始化時)會從readOnly中拷貝數(shù)據(jù),如果readOnly中數(shù)據(jù)量很大,可能偶爾會出現(xiàn)性能抖動。 sync.map不適合用于頻繁插入新key-value的場景,因為此操作會頻繁加鎖訪問dirty會導(dǎo)致性能下降。更新操作在key存在于readOnly中且值沒有被標記為刪除(expunged)的場景下會用無鎖操作CAS進行性能優(yōu)化,否則也會加鎖訪問dirty
2.3 Delete方法

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {??//?從m.read中換出readOnly,然后從里面找key,此過程不加鎖read, _ := m.read.Load().(readOnly)e, ok := read.m[key]??//?readOnly不存在此key,但dirty中可能存在if !ok && read.amended {????//?加鎖訪問dirtym.mu.Lock()// 雙重檢測:若加鎖前Map.dirty被替換為readonly,則前面m.read.Load().(readOnly)無// 效,需要再次檢查read, _ = m.read.Load().(readOnly)e, ok = read.m[key]????//?readOnly不存在此key,但是dirty中可能存在if !ok && read.amended {e, ok = m.dirty[key]delete(m.dirty, key)m.missLocked()}m.mu.Unlock()}if ok {// 如果entry.p不為nil或者expunged,則把entry.p軟刪除(標記為nil)return e.delete()}return nil, false}
delete函數(shù)代碼如下:
func (e *entry) delete() (value interface{}, ok bool) {for {p := atomic.LoadPointer(&e.p)if p == nil || p == expunged {return nil, false}????//?e.p是真實值,把它置為nilif atomic.CompareAndSwapPointer(&e.p, p, nil) {return *(*interface{})(p), true}}}
刪除readOnly中存在的key,可以不用加鎖 如果刪除readOnly中不存在的或者Map中不存在的key,都需要加鎖。
2.4 Range方法

Range方法可遍歷Map,參數(shù)是個函數(shù)(入?yún)ⅲ簁ey和value,返回值:是否停止遍歷Range方法)
func (m *Map) Range(f func(key, value interface{}) bool) {read, _ := m.read.Load().(readOnly)// dirty存在readOnly中不存在的元素if read.amended {// 加鎖訪問dirtym.mu.Lock()// 再次檢測read.amended,因為加鎖前它可能已由true變成falseread, _ = m.read.Load().(readOnly)if read.amended {// readOnly.amended被默認賦值成falseread = readOnly{m: m.dirty}m.read.Store(read)m.dirty = nilm.misses = 0}m.mu.Unlock()}// 遍歷readOnly.mfor k, e := range read.m {v, ok := e.load()if !ok {continue}if !f(k, v) {break}}}
Range方法Map的全部key都存在于readOnly中時,是無鎖遍歷的,性能最高 Range方法在readOnly只存在Map中的部分key時,會一次性加鎖拷貝dirty的元素到readOnly,減少多次加鎖訪問dirty中的數(shù)據(jù)
3. sync.map總結(jié)
3.1 使用場景
3.2 設(shè)計點:expunged
entry.p取值有3種,nil、expunged和指向真實值。那expunged出現(xiàn)在什么時候呢?為什么要有expunged的設(shè)計呢?它有什么作用呢?
什么時候expunged會出現(xiàn)呢?
當(dāng)用Store方法插入新key時,會加鎖訪問dirty,并把readOnly中的未被標記為刪除的所有entry指針復(fù)制到dirty,此時之前被Delete方法標記為軟刪除的entry(entry.p被置為nil)都變?yōu)閑xpunged,那這些被標記為expunged的entry將不會出現(xiàn)在dirty中。
反向思維,如果沒有expunged,只有nil會出現(xiàn)什么結(jié)果呢?
直接刪掉entry==nil的元素,而不是置為expunged:在用Store方法插入新key時,readOnly數(shù)據(jù)拷貝到dirty時直接把為ni的entry刪掉。但這要對readOnly加鎖,sync.map設(shè)計理念是讀寫分離,所以訪問readOnly不能加鎖。
不刪除entry==nil的元素,全部拷貝:在用Store方法插入新key時,readOnly中entry.p為nil的數(shù)據(jù)全部拷貝到dirty中。那么在dirty提升為readOnly后這些已被刪除的臟數(shù)據(jù)仍會保留,也就是說它們會永遠得不到清除,占用的內(nèi)存會越來越大。
不拷貝entry.p==nil的元素:在用Store方法插入新key時,不把readOnly中entry.p為nil的數(shù)據(jù)拷貝到dirty中,那在用Store更新值時,就會出現(xiàn)readOnly和dirty不同步的狀態(tài),即readOnly中存在dirty中不存在的key,那dirty提升為readOnly時會出現(xiàn)數(shù)據(jù)丟失的問題。
4. sync.map的其他問題
實現(xiàn)len方法要統(tǒng)計readOnly和dirty的數(shù)據(jù)量,勢必會引入鎖競爭,導(dǎo)致性能下降,還會額外增加代碼實現(xiàn)復(fù)雜度 對sync.map的并發(fā)操作導(dǎo)致其數(shù)據(jù)量可能變化很快,len方法的統(tǒng)計結(jié)果參考價值不大。
orcanman/concurrent-map
concurrent-map源碼地址 https://github.com/orcaman/concurrent-map

1. 數(shù)據(jù)結(jié)構(gòu)
// SHARD_COUNT 分片大小var SHARD_COUNT = 32type ConcurrentMap []*ConcurrentMapShared// ConcurrentMapShared 分片的并發(fā)maptype ConcurrentMapShared struct {items map[string]interface{}sync.RWMutex // 訪問內(nèi)部map都需要先獲取讀寫鎖}// New 創(chuàng)建一個concurrent map.func New() ConcurrentMap {m := make(ConcurrentMap, SHARD_COUNT)for i := 0; i < SHARD_COUNT; i++ {m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}}return m}
2. 函數(shù)介紹
2.1 GET方法
// 先hash拿到key對應(yīng)的分區(qū)號,然后加鎖,讀取值,最后釋放鎖和返回func (m ConcurrentMap) Get(key string) (interface{}, bool) {// Get shardshard := m.GetShard(key)shard.RLock()// Get item from shard.val, ok := shard.items[key]shard.RUnlock()return val, ok}
2.2 SET方法
// 先hash拿到key對應(yīng)的分區(qū)號,然后加鎖,設(shè)置新值,最后釋放鎖func (m ConcurrentMap) Set(key string, value interface{}) {// Get map shard.shard := m.GetShard(key)shard.Lock()shard.items[key] = valueshard.Unlock()}
2.3 Remove方法
// 先hash拿到key對應(yīng)的分區(qū)號,然后加鎖,刪除key,最后釋放鎖func (m ConcurrentMap) Remove(key string) {// Try to get shard.shard := m.GetShard(key)shard.Lock()delete(shard.items, key)shard.Unlock()}
2.4 Count方法
// 分別拿到每個分片map中的元素數(shù)量,然后匯總后返回func (m ConcurrentMap) Count() int {count := 0for i := 0; i < SHARD_COUNT; i++ {shard := m[i]shard.RLock()count += len(shard.items)shard.RUnlock()}return count}
2.5 Upsert方法
// 先hash拿到key對應(yīng)的分區(qū)號,然后加鎖,如果key存在就更新其value,否則插入新的k-v,釋放鎖并返回func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {shard := m.GetShard(key)shard.Lock()v, ok := shard.items[key]res = cb(ok, v, value)shard.items[key] = resshard.Unlock()return res}
后續(xù)
參考鏈接
https://stackoverflow.com/questions/45585589/golang-fatal-error-concurrent-map-read-and-map-write/45585833
https://github.com/golang/go/issues/20680
https://github.com/golang/go/blob/master/src/sync/map.gohttps://github.com/orcaman/concurrent-map
推薦閱讀
