在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
原文链接:https://segmentfault.com/a/1190000018448064 作者:薛薛薛 分断锁type SimpleCache struct { mu sync.RWMutex items map[interface{}]*simpleItem } 在日常开发中, 上述这种数据结构肯定不少见,因为golang的原生map是非并发安全的,所以为了保证map的并发安全,最简单的方式就是给map加锁。 分段锁的实现// Map 分片 type ConcurrentMap []*ConcurrentMapShared // 每一个Map 是一个加锁的并发安全Map type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex // 各个分片Map各自的锁 } 主流的分段锁,即通过hash取模的方式找到当前访问的key处于哪一个分片之上,再对该分片进行加锁之后再读写。分片定位时,常用有BKDR, FNV32等hash算法得到key的hash值。 func New() ConcurrentMap { // SHARD_COUNT 默认32个分片 m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{ items: make(map[string]interface{}), } } return m } 在初始化好分片后, 对分片上的数据进行读写时就需要用hash取模进行分段定位来确认即将要读写的分片。 获取段定位func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } // FNV hash func fnv32(key string) uint32 { hash := uint32(2166136261) const prime32 = uint32(16777619) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash } 之后对于map的GET SET 就简单顺利成章的完成 Set And Getfunc (m ConcurrentMap) Set(key string, value interface{}) { shard := m.GetShard(key) // 段定位找到分片 shard.Lock() // 分片上锁 shard.items[key] = value // 分片操作 shard.Unlock() // 分片解锁 } func (m ConcurrentMap) Get(key string) (interface{}, bool) { shard := m.GetShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok } 由此一个分段锁Map就实现了, 但是比起普通的Map, 常用到的方法比如获取所有key, 获取所有Val 操作是要比原生Map复杂的,因为要遍历每一个分片的每一个数据, 好在golang的并发特性使得解决这类问题变得非常简单 Keys// 统计当前分段map中item的个数 func (m ConcurrentMap) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { shard := m[i] shard.RLock() count += len(shard.items) shard.RUnlock() } return count } // 获取所有的key func (m ConcurrentMap) Keys() []string { count := m.Count() ch := make(chan string, count) // 每一个分片启动一个协程 遍历key go func() { wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) for _, shard := range m { go func(shard *ConcurrentMapShared) { defer wg.Done() shard.RLock() // 每个分片中的key遍历后都写入统计用的channel for key := range shard.items { ch <- key } shard.RUnlock() }(shard) } wg.Wait() close(ch) }() keys := make([]string, count) // 统计各个协程并发读取Map分片的key for k := range ch { keys = append(keys, k) } return keys } 这里写了一个benchMark来对该分段锁Map和原生的Map加锁方式进行压测, 场景为将一万个不重复的键值对同时以100万次写和100万次读,分别进行5次压测, 如下压测代码 func BenchmarkMapShared(b *testing.B) { num := 10000 testCase := genNoRepetTestCase(num) // 10000个不重复的键值对 m := New() for _, v := range testCase { m.Set(v.Key, v.Val) } b.ResetTimer() for i := 0; i < 5; i++ { b.Run(strconv.Itoa(i), func(b *testing.B) { b.N = 1000000 wg := sync.WaitGroup{} wg.Add(b.N * 2) for i := 0; i < b.N; i++ { e := testCase[rand.Intn(num)] go func(key string, val interface{}) { m.Set(key, val) wg.Done() }(e.Key, e.Val) go func(key string) { _, _ = m.Get(key) wg.Done() }(e.Key) } wg.Wait() }) } } 原生Map加锁压测结果 分段锁压测结果 可以看出在将锁的粒度细化后再面对大量需要控制并发安全的访问时,分段锁Map的耗时比原生Map加锁要快3倍有余 |
请发表评论