您现在的位置是:网站首页> 编程资料编程资料
深度解密 Go 语言中的 sync.map_Golang_
2023-05-26
407人已围观
简介 深度解密 Go 语言中的 sync.map_Golang_
工作中,经常会碰到并发读写 map 而造成 panic 的情况,为什么在并发读写的时候,会 panic 呢?因为在并发读写的情况下,map 里的数据会被写乱,之后就是 Garbage in, garbage out,还不如直接 panic 了。
是什么
Go 语言原生 map 并不是线程安全的,对它进行并发读写操作的时候,需要加锁。而 sync.map 则是一种并发安全的 map,在 Go 1.9 引入。
sync.map是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。sync.map的零值是有效的,并且零值是一个空的 map。在第一次使用之后,不允许被拷贝。
有什么用
一般情况下解决并发读写 map 的思路是加一把大锁,或者把一个 map 分成若干个小 map,对 key 进行哈希,只操作相应的小 map。前者锁的粒度比较大,影响效率;后者实现起来比较复杂,容易出错。
而使用 sync.map 之后,对 map 的读写,不需要加锁。并且它通过空间换时间的方式,使用 read 和 dirty 两个 map 来进行读写分离,降低锁时间来提高效率。
如何使用
使用非常简单,和普通 map 相比,仅遍历的方式略有区别:
package main import ( "fmt" "sync" ) func main() { var m sync.Map // 1. 写入 m.Store("qcrao", 18) m.Store("stefno", 20) // 2. 读取 age, _ := m.Load("qcrao") fmt.Println(age.(int)) // 3. 遍历 m.Range(func(key, value interface{}) bool { name := key.(string) age := value.(int) fmt.Println(name, age) return true }) // 4. 删除 m.Delete("qcrao") age, ok := m.Load("qcrao") fmt.Println(age, ok) // 5. 读取或写入 m.LoadOrStore("stefno", 100) age, _ = m.Load("stefno") fmt.Println(age) }第 1 步,写入两个 k-v 对;
第 2 步,使用 Load 方法读取其中的一个 key;
第 3 步,遍历所有的 k-v 对,并打印出来;
第 4 步,删除其中的一个 key,再读这个 key,得到的就是 nil;
第 5 步,使用 LoadOrStore,尝试读取或写入 "Stefno",因为这个 key 已经存在,因此写入不成功,并且读出原值。
程序输出:
18
stefno 20
qcrao 18false
20
sync.map 适用于读多写少的场景。对于写多的场景,会导致 read map 缓存失效,需要加锁,导致冲突变多;而且由于未命中 read map 次数过多,导致 dirty map 提升为 read map,这是一个 O(N) 的操作,会进一步降低性能。
源码分析
数据结构
先来看下 map 的数据结构。去掉大段的注释后:
type Map struct { mu Mutex read atomic.Value // readOnly dirty map[interface{}]*entry misses int }互斥量 mu 保护 read 和 dirty。
read 是 atomic.Value 类型,可以并发地读。但如果需要更新 read,则需要加锁保护。对于 read 中存储的 entry 字段,可能会被并发地 CAS 更新。但是如果要更新一个之前已被删除的 entry,则需要先将其状态从 expunged 改为 nil,再拷贝到 dirty 中,然后再更新。
dirty 是一个非线程安全的原始 map。包含新写入的 key,并且包含 read 中的所有未被删除的 key。这样,可以快速地将 dirty 提升为 read 对外提供服务。如果 dirty 为 nil,那么下一次写入时,会新建一个新的 dirty,这个初始的 dirty 是 read 的一个拷贝,但除掉了其中已被删除的 key。
每当从 read 中读取失败,都会将 misses 的计数值加 1,当加到一定阈值以后,需要将 dirty 提升为 read,以期减少 miss 的情形。
read map和dirty map的存储方式是不一致的。
前者使用 atomic.Value,后者只是单纯的使用 map。
原因是 read map 使用 lock free 操作,必须保证 load/store 的原子性;而 dirty map 的 load+store 操作是由 lock(就是 mu)来保护的。
真正存储 key/value 的是 read 和 dirty 字段。read 使用 atomic.Value,这是 lock-free 的基础,保证 load/store 的原子性。dirty 则直接用了一个原始的 map,对于它的 load/store 操作需要加锁。
read 字段里实际上是存储的是:
// readOnly is an immutable struct stored atomically in the Map.read field. type readOnly struct { m map[interface{}]*entry amended bool // true if the dirty map contains some key not in m. }注意到 read 和 dirty 里存储的东西都包含 entry,来看一下:
type entry struct { p unsafe.Pointer // *interface{} }很简单,它是一个指针,指向 value。看来,read 和 dirty 各自维护一套 key,key 指向的都是同一个 value。也就是说,只要修改了这个 entry,对 read 和 dirty 都是可见的。这个指针的状态有三种:

当 p == nil 时,说明这个键值对已被删除,并且 m.dirty == nil,或 m.dirty[k] 指向该 entry。
当 p == expunged 时,说明这条键值对已被删除,并且 m.dirty != nil,且 m.dirty 中没有这个 key。
其他情况,p 指向一个正常的值,表示实际 interface{} 的地址,并且被记录在 m.read.m[key] 中。如果这时 m.dirty 不为 nil,那么它也被记录在 m.dirty[key] 中。两者实际上指向的是同一个值。
当删除 key 时,并不实际删除。一个 entry 可以通过原子地(CAS 操作)设置 p 为 nil 被删除。如果之后创建 m.dirty,nil 又会被原子地设置为 expunged,且不会拷贝到 dirty 中。
如果 p 不为 expunged,和 entry 相关联的这个 value 可以被原子地更新;如果 p == expunged,那么仅当它初次被设置到 m.dirty 之后,才可以被更新。
整体用一张图来表示:

Store
先来看 expunged:
var expunged = unsafe.Pointer(new(interface{}))它是一个指向任意类型的指针,用来标记从 dirty map 中删除的 entry。
// Store sets the value for a key. func (m *Map) Store(key, value interface{}) { // 如果 read map 中存在该 key 则尝试直接更改(由于修改的是 entry 内部的 pointer,因此 dirty map 也可见) read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { // 如果 read map 中存在该 key,但 p == expunged,则说明 m.dirty != nil 并且 m.dirty 中不存在该 key 值 此时: // a. 将 p 的状态由 expunged 更改为 nil // b. dirty map 插入 key m.dirty[key] = e } // 更新 entry.p = value (read map 和 dirty map 指向同一个 entry) e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { // 如果 read map 中不存在该 key,但 dirty map 中存在该 key,直接写入更新 entry(read map 中仍然没有这个 key) e.storeLocked(&value) } else { // 如果 read map 和 dirty map 中都不存在该 key,则: // a. 如果 dirty map 为空,则需要创建 dirty map,并从 read map 中拷贝未删除的元素到新创建的 dirty map // b. 更新 amended 字段,标识 dirty map 中存在 read map 中没有的 key // c. 将 kv 写入 dirty map 中,read 不变 if !read.amended { // 到这里就意味着,当前的 key 是第一次被加到 dirty map 中。 // store 之前先判断一下 dirty map 是否为空,如果为空,就把 read map 浅拷贝一次。 m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } // 写入新 key,在 dirty 中存储 value m.dirty[key] = newEntry(value) } m.mu.Unlock() }整体流程:
- 如果在 read 里能够找到待存储的 key,并且对应的 entry 的 p 值不为 expunged,也就是没被删除时,直接更新对应的 entry 即可。
- 第一步没有成功:要么 read 中没有这个 key,要么 key 被标记为删除。则先加锁,再进行后续的操作。
- 再次在 read 中查找是否存在这个 key,也就是 double check 一下,这也是 lock-free 编程里的常见套路。如果 read 中存在该 key,但 p == expunged,说明 m.dirty != nil 并且 m.dirty 中不存在该 key 值 此时: a. 将 p 的状态由 expunged 更改为 nil;b. dirty map 插入 key。然后,直接更新对应的 value。
- 如果 read 中没有此 key,那就查看 dirty 中是否有此 key,如果有,则直接更新对应的 value,这时 read 中还是没有此 key。
- 最后一步,如果 read 和 dirty 中都不存在该 key,则:a. 如果 dirty 为空,则需要创建 dirty,并从 read 中拷贝未被删除的元素;b. 更新 amended 字段,标识 dirty map 中存在 read map 中没有的 key;c. 将 k-v 写入 dirty map 中,read.m 不变。最后,更新此 key 对应的 value。
再来看一些子函数:
// 如果 entry 没被删,tryStore 存储值到 entry 中。如果 p == expunged,即 entry 被删,那么返回 false。 func (e *entry) tryStore(i *interface{}) bool { for { p := atomic.LoadPointer(&e.p) if p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } } }tryStore 在 Store 函数最开始的时候就会调用,是比较常见的 for 循环加 CAS 操作,尝试更新 entry,让 p 指向新的值。
unexpungeLocked 函数确保了 entry 没有被标记成已被清除:
// unexpungeLocked 函数确保了 entry 没有被标记成已被清除。 // 如果 entry 先前被清除过了,那么在 mutex 解锁之前,它一定要被加入到 dirty map 中 func (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil) }Load
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 如果没在 read 中找到,并且 amended 为 true,即 dirty 中存在 read 中没有的 key if !ok && read.amended { m.mu.Lock() // dirty map 不是线程安全的,所以需要加上互斥锁 // double check。避免在上锁的过程中 dirty map 提升为 read map。 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] // 仍然没有在 read 中找到这个 key,并且 amended 为 true if !ok && read.amended { e, ok = m.dirty[key] // 从 dirty 中找 // 不管 dirty 中有没有找到,都要"记一笔",因为在 dirty 提升为 read 之前,都会进入这条路径 m.missLocked() } m.mu.Unlock() } if !ok { // 如果没找到,返回空,false return nil, false } return e.load() }处理路径分为 fast path 和 slow path,整体流程如下:
- 首先是 fast path,直接在 read 中找,如果找到了直接调用 entry 的 load 方法,取出其中的值。
- 如果 read 中没有这个 key,且 amended 为 fase,说明 dirty 为空,那直接返回 空和 false。
- 如果 read 中没有这个 key,且 amended 为 true,说明 dirty 中可能存在我们要找的 key。当然要先上锁,再尝试去 dirty 中查找。在这之前,仍然有一个 double check 的操作。若还是没有在 read 中找到,那么就从 dirty 中找。不管 dirty 中有没有找到,都要"记一笔",因为在 dirty 被提升为 read 之前,都会进入这条路径
这里主要看下 missLocked 的函数的实现:
func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } // dirty map 晋升 m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }直接将 misses 的值加 1,表示一次未命中,如果 misses 值小于 m.dirty 的长度,就直接返回。否则,将 m.dirty 晋升为 read,并清空 dirty,清空 misses 计数值。这样,之前一段时间新加入的 key 都会进入到 read 中,从而能够提升 read 的命中率。
再来看下 entry 的 load 方法:
func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true }对于 nil 和 expunged 状态的 entry,直接返回 ok=false;否则,将
