• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

4. Go并发编程--Mutex/RWMutex

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

一.前言

我们反复提到了goroutine的创建时简单的。 但是仍然要小心, 习惯总是会导致我们可能写出一些bug.对于语言规范没有定义的内容不要做任何的假设。

需要通过同步语义来控制代码的执行顺序 这一点很重要。 这些包提供了一些基础的同步语义,但是在实际的并发编程当中,我们应该使用 channel 来进行同步控制。

二. Mutex

2.1 案例

上一篇文章中有这样一个例子

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var counter int

func main() {
	// 多跑几次来看结果
	for i := 0; i < 100000; i++ {
		run()
	}
	fmt.Printf("Final Counter: %d\n", counter)
}


func run() {
    // 开启两个 协程,操作
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go routine(i)
	}
	wg.Wait()
}

func routine(id int) {
	for i := 0; i < 2; i++ {
		value := counter
		value++
		counter = value
	}
	wg.Done()
}

测试后我们会发现,每次执行的结果都不一样, 那么如何运用Mutex进行修改呢?

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var counter int
var mu sync.Mutex

func main() {
	// 多跑几次来看结果
	for i := 0; i < 100000; i++ {
		run()
	}
	fmt.Printf("Final Counter: %d\n", counter)
}


func run() {
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go routine(i)
	}
	wg.Wait()
	fmt.Printf("Final Counter: %d\n", counter)
}

func routine(id int) {
	for i := 0; i < 2; i++ {
	    // 加锁
		mu.Lock()
		counter++
		// 解锁
		mu.Unlock()
	}
	wg.Done()
}

这里主要的目的就是为了保护我们临界区的数据,通过锁来进行保证。锁的使用非常的简单,但是还是有几个需要注意的点

  • 锁的范围要尽量的小,不要搞很多大锁
  • 用锁一定要解锁,小心产生死锁

三. 实现原理

3.1 锁的实现模式

  • Barging: 这种模式是为了提高吞吐量,当锁被释放时,它会唤醒第一个等待者,然后把锁给第一个等待者或者给第一个请求锁的人
  • Handoff: 当锁释放的时候, 锁会一直持有直到第一个等待者准备好获取锁。 它降低了吞吐量,因为锁被持有, 即使另一个 goroutine 准备获取它。这种模式可以解决公平性的问题,因为在 Barging 模式下可能会存在被唤醒的 goroutine 永远也获取不到锁的情况,毕竟一直在 cpu 上跑着的 goroutine 没有上下文切换会更快一些。缺点就是性能会相对差一些
  • Spining:自旋在等待队列为空或者应用程序重度使用锁时效果不错。Parking 和 Unparking goroutines 有不低的性能成本开销,相比自旋来说要慢得多。但是自旋是有成本的,所以在 go 的实现中进入自旋的条件十分的苛刻。

3.2 Go Mutex 实现原理

3.2.1 加锁

  1. 首先如果当前锁处于初始化状态就直接用CAS方法尝试获取锁,这是 Fast Path
  2. 如果失败就进入 Slow Path
    • 会首先判断当前能不能进入自旋状态,如果可以就进入自旋,最多自旋 4 次
    • 自旋完成之后,就会去计算当前的锁的状态
    • 然后尝试通过 CAS 获取锁
    • 如果没有获取到就调用 runtime_SemacquireMutex 方法休眠当前 goroutine 并且尝试获取信号量
    • goroutine 被唤醒之后会先判断当前是否处在饥饿状态,(如果当前 goroutine 超过 1ms 都没有获取到锁就会进饥饿模式)
      1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出
      2. 如果不在,就会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环

CAS 方法在这里指的是 atomic.CompareAndSwapInt32(addr, old, new) bool 方法,这个方法会先比较传入的地址的值是否是 old,如果是的话就尝试赋新值,如果不是的话就直接返回 false,返回 true 时表示赋值成功
饥饿模式是 Go 1.9 版本之后引入的优化,用于解决公平性的问题[10]

3.2.2 解锁

解锁的流程相对于加锁简单很多,这里直接上图,过程不过多赘述

四. 源码分析

4.1 Mutex基本结构

Mutex是个结构体,源码如下

type Mutex struct {
	state int32
	sema  uint32
}

Mutex 结构体由 state sema 两个 4 字节成员组成,其中 state 表示了当前锁的状态, sema 是用于控制锁的信号量

state 字段的最低三位表示三种状态,分别是 mutexLocked mutexWoken mutexStarving ,剩下的用于统计当前在等待锁的 goroutine 数量

  • mutexLocked 表示是否处于锁定状态
  • mutexWoken 表示是否处于唤醒状态
  • mutexStarving 表示是否处于饥饿状态

4.2 加锁

互斥锁加锁逻辑如下

  • 当调用 Lock 方法的时候,会先尝试走 Fast Path,也就是如果当前互斥锁如果处于未加锁的状态,尝试加锁,只要加锁成功就直接返回

    func (m *Mutex) Lock() {
    	// Fast path: grab unlocked mutex.
    	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    		return
    	}
    	// Slow path (outlined so that the fast path can be inlined)
    	m.lockSlow()
    }
    
  • 否则的话就进入 slow path

    func (m *Mutex) lockSlow() {
    var waitStartTime int64 // 等待时间
    starving := false // 是否处于饥饿状态
    awoke := false // 是否处于唤醒状态
    iter := 0 // 自旋迭代次数
    old := m.state
    for {
    	// Don't spin in starvation mode, ownership is handed off to waiters
    	// so we won't be able to acquire the mutex anyway.
    	if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    		// Active spinning makes sense.
    		// Try to set mutexWoken flag to inform Unlock
    		// to not wake other blocked goroutines.
    		if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    			atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    			awoke = true
    		}
    		runtime_doSpin()
    		iter++
    		old = m.state
    		continue
    	}
    
  • lockSlow 方法中可以看到,有一个大的for 循环,不断的尝试去获取互斥锁,在循环的内部,第一步就是判断能否自旋状态。
    进入自旋状态的判断比较苛刻,具体需要满足什么条件呢? runtime_canSpin 源码见下方

    • 当前互斥锁的状态是非饥饿状态,并且已经被锁定了
    • 自旋次数不超过 4 次
    • cpu 个数大于一,必须要是多核 cpu
    • 当前正在执行当中,并且队列空闲的 p 的个数大于等于一
    // Active spinning for sync.Mutex.
    //go:linkname sync_runtime_canSpin sync.runtime_canSpin
    //go:nosplit
    func sync_runtime_canSpin(i int) bool {
        // 自旋次数不超过4
        // cpu个数大于1--所以必须是多核CPU
        // 队列空闲的p的个数大于等于1
    	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
    		return false
    	}
    	if p := getg().m.p.ptr(); !runqempty(p) {
    		return false
    	}
    	return true
    }
    
  • 如果可以进入自旋状态之后就会调用 runtime_doSpin 方法进入自旋, doSpin 方法会调用 procyield(30) 执行三十次 PAUSE 指令

    TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL	cycles+0(FP), AX
    again:
    	PAUSE
    	SUBL	$1, AX
    	JNZ	again
    	RET
    

    为什么使用 PAUSE 指令呢?
    PAUSE 指令会告诉 CPU 我当前处于处于自旋状态,这时候 CPU 会针对性的做一些优化,并且在执行这个指令的时候 CPU 会降低自己的功耗,减少能源消耗

    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
        awoke = true
    }
    
  • 在自旋的过程中会尝试设置 mutexWoken 来通知解锁,从而避免唤醒其他已经休眠的 goroutine 在自旋模式下,当前的 goroutine 就能更快的获取到锁

    new := old
    // Don't try to acquire starving mutex, new arriving goroutines must queue.
    if old&mutexStarving == 0 {
    	new |= mutexLocked
    }
    if old&(mutexLocked|mutexStarving) != 0 {
    	new += 1 << mutexWaiterShift
    }
    // The current goroutine switches mutex to starvation mode.
    // But if the mutex is currently unlocked, don't do the switch.
    // Unlock expects that starving mutex has waiters, which will not
    // be true in this case.
    if starving && old&mutexLocked != 0 {
    	new |= mutexStarving
    }
    if awoke {
    	// The goroutine has been woken from sleep,
    	// so we need to reset the flag in either case.
    	if new&mutexWoken == 0 {
    		throw("sync: inconsistent mutex state")
    	}
    	new &^= mutexWoken
    }
    
  • 自旋结束之后就会去计算当前互斥锁的状态,如果当前处在饥饿模式下则不会去请求锁,而是会将当前 goroutine 放到队列的末端

    if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // If we were already waiting before, queue at the front of the queue.
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
        // If this goroutine was woken and mutex is in starvation mode,
        // ownership was handed off to us but mutex is in somewhat
        // inconsistent state: mutexLocked is not set and we are still
        // accounted as waiter. Fix that.
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        if !starving || old>>mutexWaiterShift == 1 {
            // Exit starvation mode.
            // Critical to do it here and consider wait time.
            // Starvation mode is so inefficient, that two goroutines
            // can go lock-step infinitely once they switch mutex
            // to starvation mode.
            delta -= mutexStarving
        }
        atomic.AddInt32(&m.state, delta)
        break
    }
    awoke = true
    iter = 0
    }
    

状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环
如果获取失败,则会调用 runtime_SemacquireMutex(&m.sema, queueLifo, 1) 方法保证锁不会同时被两个 goroutine 获取。runtime_SemacquireMutex 方法的主要作用是:

  • 不断调用尝试获取锁
  • 休眠当前goroutine
  • 等待信号量, 唤醒 goroutine

goroutine 被唤醒后就会去判断当前是否处于饥饿模式,如果当前等待超过1ms 就会进入饥饿模式

  • 饥饿模式下: 会获得互斥锁,如果等待队列中只存在当前Goroutine, 互斥锁还会从饥饿模式中退出
  • 正常模式下: 会设置唤醒和饥饿标识, 重置迭代次数并重新执行获取锁的循环

4.3 解锁

加锁比解锁简单多了,原理直接参考源码的注释

// 解锁没有绑定关系,可以一个 goroutine 锁定,另外一个 goroutine 解锁
func (m *Mutex) Unlock() {
	// Fast path: 直接尝试设置 state 的值,进行解锁
	new := atomic.AddInt32(&m.state, -mutexLocked)
    // 如果减去了 mutexLocked 的值之后不为零就会进入慢速通道,这说明有可能失败了,或者是还有其他的 goroutine 等着
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
    // 解锁一个没有锁定的互斥量会报运行时错误
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
    // 判断是否处于饥饿模式
	if new&mutexStarving == 0 {
        // 正常模式
		old := new
		for {
			// 如果当前没有等待者.或者 goroutine 已经被唤醒或者是处于锁定状态了,就直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 唤醒等待者并且移交锁的控制权
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 饥饿模式,走 handoff 流程,直接将锁交给下一个等待的 goroutine,注意这个时候不会从饥饿模式中退出
		runtime_Semrelease(&m.sema, true, 1)
	}
}

通过源码注解,总结解锁需要注意以下

  1. 解锁一个没有锁定的互斥量会报运行时错误

五. 读写锁(RWMutex)

读写锁相对于互斥锁来说粒度更细,使用读写锁可以并发读,但是不能并发读写,或者并发写写

5.1 案例

大部分的业务应用都是读多写少的场景,这个时候使用读写锁的性能就会比互斥锁要好一些,例如下面的这个例子,是一个配置读写的例子,我们分别使用读写锁和互斥锁实现

  1. 读写锁
// RWMutexConfig 读写锁实现
type RWMutexConfig struct {
	rw   sync.RWMutex
	data []int
}

// Get get config data
func (c *RWMutexConfig) Get() []int {
	c.rw.RLock()
	defer c.rw.RUnlock()
	return c.data
}

// Set set config data
func (c *RWMutexConfig) Set(n []int) {
	c.rw.Lock()
	defer c.rw.Unlock()
	c.data = n
}
  1. 互斥锁

// MutexConfig 互斥锁实现
type MutexConfig struct {
	data []int
	mu   sync.Mutex
}

// Get get config data
func (c *MutexConfig) Get() []int {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.data
}

// Set set config data
func (c *MutexConfig) Set(n []int) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.data = n
}

并发基准测试,测试两种锁的性能

type iConfig interface {
	Get() []int
	Set([]int)
}

func bench(b *testing.B, c iConfig) {
	b.RunParallel(func(p *testing.PB) {
		for p.Next() {
			c.Set([]int{100})
			c.Get()
			c.Get()
			c.Get()
			c.Set([]int{100})
			c.Get()
			c.Get()
		}
	})
}

func BenchmarkMutexConfig(b *testing.B) {
	conf := &MutexConfig{data: []int{1, 2, 3}}
	bench(b, conf)
}

func BenchmarkRWMutexConfig(b *testing.B) {
	conf := &RWMutexConfig{data: []int{1, 2, 3}}
	bench(b, conf)
}

执行测试结果如下

root@failymao:/mnt/d/gopath/src/Go_base/daily_test/mutex# go test -race -bench=.
goos: linux
goarch: amd64
pkg: Go_base/daily_test/mutex
BenchmarkMutexConfig-8            179932              5820 ns/op
BenchmarkRWMutexConfig-8          279578              3939 ns/op
PASS
ok      Go_base/daily_test/mutex        3.158s

可以看到首先是没有 data race 问题,其次读写锁的性能几乎是互斥锁的一倍

5.2 源码解析

5.2.1 基本结构

type RWMutex struct {
    w           Mutex  // 复用互斥锁
	writerSem   uint32 // 信号量,用于写等待读
	readerSem   uint32 // 信号量,用于读等待写
	readerCount int32  // 当前执行读的 goroutine 数量
	readerWait  int32  // 写操作被阻塞的准备读的 goroutine 的数量
}

由于复用了互斥锁的代码,读写锁的源码很简单

5.2.2 读锁

加锁

func (rw *RWMutex) RLock() {
    // 直接对读的goroutine记录加1,并判断当前执行的读的goroutines数量是否为空
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
}

首先是读锁, atomic.AddInt32(&rw.readerCount, 1) 调用这个原子方法,对当前在读的数量加一,如果返回负数,那么说明当前有其他写锁,这时候就调用 runtime_SemacquireMutex 休眠 goroutine 等待被唤醒

解锁

func (rw *RWMutex) RUnlock() {
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
}

解锁的时候对正在读的操作减一,如果返回值小于 0 那么说明当前有在写的操作,这个时候调用 rUnlockSlow 进入慢速通道

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

5.2.3 写锁

写锁

func (rw *RWMutex) Lock() {
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
}

首先调用互斥锁的 lock,获取到互斥锁之后

  • atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) 调用这个函数阻塞后续的读操作
  • 如果计算之后当前仍然有其他 goroutine 持有读锁,那么就调用 runtime_SemacquireMutex 休眠当前的 goroutine 等待所有的读操作完成

解锁

func (rw *RWMutex) Unlock() {
	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
}

解锁的操作,会先调用 atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) 将恢复之前写入的负数,然后根据当前有多少个读操作在等待,循环唤醒

六.参考

  1. https://mojotv.cn/go/golang-muteex-starvation
  2. https://lailin.xyz/post/go-training-week3-sync.html
  3. https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
go-restful实现一个webserver发布时间:2022-07-10
下一篇:
《Go语言编程》【2.7 完整示例】的错误发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap