5.并发编程
5.1并发介绍
5.1.1并发与并行
- 多线程程序在一个核的cpu上运行,就是并发。
- 多线程程序在多个核的cpu上运行,就是并行。
5.1.2协程
- 协程独立的栈空间,并共享堆空间,调度由用户自己控制,本质上类似于用户级县城
- Goroutine 由官方实现的超级“线程池”。
5.2 Goroutine
- 在JAVA/c++中我们要实现并发编程,我们要自己维护一个线程池,并且需要自己去包装一个又一个任务。同时需要自己去调度线程执行任务,并维护上下文切换。这一切通常会耗费程序员大量的心智。
- 而GO语言中goroutine只需程序员自己定义很多任务,系统去帮助我们把这些任务分配到CPU上实现并发执行。goroutine概念类似于线程,GO程序会智能将goroutine中的人物合理地分配给每个CPU,Go语言之所以被称为现代化编程语言。就是因为它在语言层面已经内置了调度和上下文切换的机制。
5.2.1使用goroutine
-
Go语言中使用goroutine非常简单,只需要调用函数时候前面加上go关键字就可以为一个函数创建一个goroutine。
-
启动一个goroutine:
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("hello world!")
}
func main() {
go hello()
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
// 当main函数返回结果时候,该goroutine就结束了,所有main函数中启动goroutine会一同结束。所以这里要time.Sleep一下,等一下helllo函数
5.2.2启动多个goroutine
-
启动多个grouting。
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done()
fmt.Println("hello Goroutine!", i)
}
func main() {
for i:=0;i<10;i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait()
}
hello Goroutine! 1
hello Goroutine! 6
hello Goroutine! 9
hello Goroutine! 0
hello Goroutine! 3
hello Goroutine! 2
hello Goroutine! 8
hello Goroutine! 7
hello Goroutine! 5
hello Goroutine! 4
-
你会发现执行顺序闭一只,这是因为goroutine是并发执行的。而goroutine调度是随机的。
-
需要知道的事主协程退出了,其他任务就不会执行了。
5.2.3goroutine调度
-
GPM是Go语言运行时层面的实现。是GO语言自己实现一套调度系统。
- 1.G很好理解,就是个goroutine的,里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
- 2.P管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
- 3.M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是一一映射的关系, 一个groutine最终是要放到M上执行的;(与内核线程形成虚拟映射)
P与M一般也是一一对应的。他们关系是: P管理着一组G挂载在M上运行。当一个G长久阻塞在一个M上时,runtime会新建一个M,阻塞G所在的P会把其他的G 挂载在新建的M上。当旧的G阻塞完成或者认为其已经死掉时 回收旧的M。
P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。
单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。
5.3 runtime包
5.3.1 runtime.Gosched()
5.3.2 runtime.Goexit()
5.3.3 runtime.GoMAXPROCS
-
Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。
package main
import (
"fmt"
"runtime"
"time"
)
func a() {
for i:=1;i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i:=1;i<10;i++ {
fmt.Println("B:",i)
}
}
func main() {
runtime.GOMAXPROCS(2)// 指定CPU核心数2个
// a,b 任务并行执行。
go a()
go b()
time.Sleep(time.Second)
}
-
Go语言中操作系统线程和goroutine关系:
- 一个操作系统线程对应用户态多个goroutine。
- go程序可以同时使用多个操作系统线程。
- goroutine和OS线程是多对多的关系。
5.4 Channel
- 单纯讲函数并发执行意义不大,函数与函数之间需要进行数据交互才可以体现并发执行函数的意义。
- 虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容器发生竞态问题,为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
- Go语言并发模型是CSP,提倡通过通信共享内存而不是通过共享内存而实现通信。
- 如果说goroutine是GO程序并发的执行体,channel就是他们之间的连接。channel可以让一个goroutine发送特定值到另一个goroutine的通信机制。
- Go语言中的通道是一种特殊类型,通道像一个传送带或队列,遵循先入先出的规则。保证收发数据顺序,每一个通道都是一个具体类型的导管。也就是声明channel时候需要为其他指定元素类型。
5.4.1channel类型
5.4.2 创建channel
5.4.3channel操作
-
通道有发送,接收和关闭三种状态。
-
发送和接收都是用<-符号。
// 定义一个通道
ch := make(chan int)
// 发送
ch <- 10 //把10发送到ch中
// 接收
x := <- ch // 从ch中接收值并赋值给变量x
// 关闭
close(ch)
-
关闭通道有以下特点:
1.对一个关闭的通道再发送值会导致panic
2.对一个关闭的通道进行接收会一直获取值直到通道为空。
3.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值
4.关闭一个已经关闭的通道会导致panic
5.4.4无缓冲的通道
-
无缓冲通道为阻塞通道,如下代码:
package main
import "fmt"
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("发送成功")
}
上面代码会报错:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/xujunkai/go/src/mypro/runtime_demo/main.go:7 +0x54
为什么会出现死锁?
应为我们使用ch:=make(chan int)创建是无缓冲的通道,无缓冲通道只有在有人接收值的时候才能发送值。
package main
import "fmt"
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) //启动goroutine从通道接收值
ch <- 10
fmt.Println("发送成功")
}
无缓冲通道发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时才能发送成功。两个goroutine讲继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
-
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
5.4.5 有缓冲的通道
-
我们可以使用make函数初始化通道时候为其定义通道容量,例如
ch := make(chan int, 1) //创建一个容量为1的有缓冲区通道
只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。
我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做
5.4.6 close()
-
通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
for i:=0;i<5;i++ {
c <- i// 向通道写入数据
}
close(c)//关闭通道
}()
// 主协程
for {
if data, ok := <-c; ok {
fmt.Println(data)//从通道读取数据
} else {
break
}
}
fmt.Println("main结束")
}
5.4.7 优雅方式从通道循环取值
-
当通过通道发送有限数据时,我们可以通过close函数关闭通道来告知从该通道接收值的goroutine停止等待,当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值,那如何判断一个通道是否被关闭了呢?
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
for i:=0;i<5;i++ {
c <- i// 向通道写入数据
}
close(c)//关闭通道
}()
// 主协程
for {
if data, ok := <-c; ok {
fmt.Println(data)//从通道读取数据
} else {
break
}
}
fmt.Println("main结束")
}
从上面例子中我们看到有两种方式在接收值的时候判断通道是否被关闭,for range 方式。if判断
5.4.8 单向通道
-
有时候我们会讲通道作为参数在多个任务函数间传递,很多时候我们在不同任务函数中使用通道都会对其进行限制。比如限制通道在函数中发送或接收。
-
Go语言提供单向通道来处理这种情况,例如,我们把上面例子改造一下:
package main
import "fmt"
func counter(out chan<- int){
for i:=0;i<100;i++ {
out <- i
}
close(out)
}
func squarer(out chan<-int,in <- chan int){
for i:=range in{
out <- i*i
}
close(out)
}
func printer(in <- chan int){
for i:= range in{
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2,ch1)
printer(ch2)
}
-
其中:
1. chan<- int 是一个智能发送的通道,可以发送但是不能接收。
2. <-chan int 是一个智能接收的通道,可以接收导师不能发送。
5.5Goroutine池
- 本质上实现生产消费者模式。
- 可以有效控制goroutine数量,防止暴涨。
package main
import (
"fmt"
"math/rand"
)
type Job struct {
Id int
// 计算随机数
RandNum int
}
type Result struct {
job *Job//传入Job对象实例
sum int//求和
}
func createPool (num int, jobChan chan *Job, resultChan chan *Result){
// 根据开协程个数,去跑运行
for i:=0; i< num; i++ {
go func(jobChan chan *Job, resultChan chan *Result) {
// 遍历job管道所有数据,进行相加
for job := range jobChan {
r_num := job.RandNum
var sum int
// 将数字每一位相加
for r_num != 0 {
tmp := r_num % 10
sum += tmp
r_num /= 10
}
// 想要的结果是Result
r := &Result{
job: job,
sum: sum,
}
// 运算结果扔到管道
resultChan <- r
}
}(jobChan, resultChan)
}
}
func main() {
// 创建job管道
jobChan := make(chan *Job, 128)
// 创建结果管道
resultChan := make(chan *Result, 128)
// 创建工作池
createPool(64, jobChan, resultChan)
// 开启打印的协程
go func(resultChan chan *Result) {
// 遍历结果管道打印
for result := range resultChan {
fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,result.job.RandNum,result.sum)
}
}(resultChan)
var id int
// 循环创建job, 输入到管道
for {
id ++
// 生成随机数
r_num := rand.Int()
job := &Job{
Id: id,
RandNum: r_num,
}
jobChan <- job
}
}
5.6定时器
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(1*time.Second)
i := 0
go func() {
for {
i++
fmt.Println(<-ticker.C)
if i == 5{
ticker.Stop()
}
}
}()
for {}
}
5.7 select
- 从多个通道接收数据,通道在接收数据时。如果没有数据可以接收将会阻塞,你也会写出如下代码方式来实现:
for {
data, ok := <- ch1
data, ok := <- ch2
}
- 上面方式可以实现多通道接收的需求,但是运行性能会很差,对于此场景,GO内置select关键字,可以同时响应多个通道的操作。
- select 使用类似于switch语句,它有一系列case分支和一个默认的分支,每个case会对应一个通道的通信过程,select会一直等待,知道某个case通信操作完成时,就会执行case分支对应语句,具体如下:
select {
case <- chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
- select 可以同时监听一个或多个channel,直到其中一个channel ready
package main
import (
"fmt"
"time"
)
func test1(ch chan string){
time.Sleep(time.Second*5)
ch <- "test1"
}
func test2(ch chan string){
time.Sleep(time.Second * 2)
ch <- "test2"
}
func main() {
output1 := make(chan string)
output2 := make(chan string)
// 跑2个子协程,写入数据
go test1(output1)
go test2(output2)
// select 监控
select {
case s1:= <-output1:
fmt.Println("s1:",s1)
case s2 := <-output2:
fmt.Println("s2:",s2)
}
}
- 如果多个channel 同时ready, 则随机选择一个执行
package main
import (
"fmt"
)
func main() {
// 创建2个管道
int_chan := make(chan int, 1)
string_chan := make(chan string, 1)
go func() {
//time.Sleep(2 * time.Second)
int_chan <- 1
}()
go func() {
string_chan <- "hello"
}()
select {
case value := <-int_chan:
fmt.Println("int:", value)
case value := <-string_chan:
fmt.Println("string:", value)
}
fmt.Println("main结束")
}
package main
import (
"fmt"
"time"
)
func main() {
// 创建管道
output1 := make(chan string, 10)
go write(output1)
// 取数据
for s:= range output1 {
// 每秒取数据
fmt.Println("read res:", s)
time.Sleep(time.Second)
}
}
func write(ch chan string) {
for {
select {
case ch <- "hello":
fmt.Println("write hello")
default:
fmt.Println("channel is full")
}
// 每0.5秒去写入数据
time.Sleep(time.Millisecond * 500)
}
}
5.8 并发安全和锁
-
有时候会有多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。
-
e.g.
package main
import (
"fmt"
"sync"
)
var x int64
var wg sync.WaitGroup
func add() {
for i:=0;i<5000;i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)// 添加2个goroutine
go add()
go add()
wg.Wait()
fmt.Println(x)
}
6562
-
我们开启两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量时候会存在数据竞争,导致最后结果与期待的不符。
5.8.1互斥锁
-
互斥锁是一种常见控制共享资源的方法,它能够保证同时只有一个goroutine可以访问共享资源,Go语言中使用sync包的Mutex类型来实现互斥锁,使用互斥锁来修复上面代码:
package main
import (
"fmt"
"sync"
)
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i:=0;i<5000;i++ {
lock.Lock()// 加锁
x = x + 1
lock.Unlock()// 解锁
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
10000
-
使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在外面等待解锁,当互斥锁释放后,等待的goroutine才可以获取锁进入临界区。多个goroutine同时等待一个锁时,唤醒策略时随机的。
5.8.2读写互斥锁
-
互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。
-
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。
-
读写锁:
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
rwlock.Lock()//加写锁
x = x+1
time.Sleep(10*time.Millisecond)// 胶乳读操作耗时10毫秒
rwlock.Unlock()//解写锁
wg.Done()
}
func read() {
rwlock.RLock()//加读锁
time.Sleep(time.Millisecond) //假设读操作耗时1毫秒
rwlock.RUnlock()//解读锁
wg.Done()
}
func main() {
start := time.Now()
for i:=0;i<10;i++ {
wg.Add(1)
go write()
}
for i:=0;i<1000;i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
-
需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。
5.9 Sync
5.9.1 sync.WaitGroup
-
在Go语言中使用sync.WaitGroup来实现并发任务的同步,而sync.WaitGroup有以下几个方法:
方法名 |
功能 |
(Wg *WaitGroup)Add(delta int) |
计数器+delta |
(Wg *WaitGroup) Done() |
计数器-1 |
(Wg *WaitGroup) Wait() |
阻塞直到计数器变为0 |
-
Sync.WaitGroup内部维护一个计数器,计数器的值可以增加和减少,例如当我们启动N个并发任务时,就将计数器值增加N,每个任务完成时通过Done()方法将计数器减1.通过调用Wait()来等待并发任务执行完。当计数器值为0时,表示所有并发任务已经完成。
-
需要注意: sync.WaitGroup是一个结构体,传递的时候要传递指针。
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("hello Goroutine!")
}
func main() {
wg.Add(1)
go hello()
fmt.Println("main goroutine done!")
wg.Wait()
}
5.9.2 sync.Once
-
说在前面的话:这是一个进阶知识点。
在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。
Go语言中的sync包中提供了一个针对只执行一次场景的解决方案–sync.Once。
sync.Once只有一个Do方法,其签名如下
func (o *Once) Do(f func()) {}
注意:如果要执行函数f需要传递参数就需要搭配闭包来使用。
package main
import (
"image"
"sync"
)
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons(){
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并发安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
5.9.3 sync.Map
package main
import (
"fmt"
"strconv"
"sync"
)
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
- 上述代码启动少量几个goroutine时候可能没什么问题,当并发多了之后执行上面代码就会抱错fatal error: concurrent map writes。
- 这种情况需要为map加锁来保证并发安全性。Go语言sync包中提供一个开箱即用的并发安全版的map-sync.Map.开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
package main
import (
"fmt"
"strconv"
"sync"
)
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
// value, _ := m.Load(key)
fmt.Printf("k:=%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
k:=3, v=3
k:=0, v=0
k:=8, v=8
k:=6, v=6
k:=19, v=19
k:=5, v=5
k:=9, v=9
k:=10, v=10
k:=2, v=2
k:=11, v=11
k:=12, v=12
k:=13, v=13
k:=14, v=14
k:=15, v=15
k:=16, v=16
k:=17, v=17
k:=18, v=18
k:=1, v=1
k:=7, v=7
k:=4, v=4
5.10原子操作(atomic包)
-
在代码中加锁操作因为设计内核态上下文切换会比较耗时,代价比较高。针对基本数据类型我们还可以使用原子操作保证并发安全。因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供
-
比较加互斥锁和原子操作的性能。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var x int64
var l sync.Mutex
var wg sync.WaitGroup
func add() {
x++
wg.Done()
}
func muteAdd() {
l.Lock()
x++
l.Unlock()
wg.Done()
}
func atomicAdd(){
atomic.AddInt64(&x, 1)//修改操作
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i<10000; i++ {
wg.Add(1)
go add()
// go atomicAdd()//3.969783ms
//go muteAdd()//4.556701ms
}
wg.Wait()
end := time.Now()
fmt.Println(x)
fmt.Println(end.Sub(start))
}
atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好
|
请发表评论