• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

GO语言并发

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

并发与并行

并发:同一时间段执行多个任务
并行:同一时刻执行多个任务
Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,我们可以根据需要创建成千上万个goroutine并发工作。goroutine是由Go语言的运行时调度完成,而线程是由操作系统调度完成。
Go语言还提供channel在多个goroutine间进行通信。goroutine和channel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

goroutine

goroutine 的概念类似于线程,但 goroutine 由 Go 程序运行时的调度和管理。Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

使用goroutine

Go 程序中使用go关键字为一个函数创建一个goroutine。一个函数可以被创建多个 goroutine,一个goroutine必定对应一个函数。

启动单个goroutine

在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

func hello() {
    fmt.Println("Hello ares!")
}
func main() {
    hello()
    fmt.Println("Hello BJ!")
}
#串行执行,先输出Hello ares!后输出Hello BJ!

在调用hello函数前面加上关键字go,也就是启动一个goroutine去执行hello这个函数。

func hello() {
    fmt.Println("Hello ares!")
}
func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("Hello BJ!")
}
#只输出了Hello BJ!因为在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束。

让main函数等待hello函数,可使用sleep

func hello() {
    fmt.Println("Hello ares!")
}
func main() {
    go hello()
    fmt.Println("Hello BJ!")
    time.Sleep(time.Second) 
}
#先输出Hello BJ!后输出Hello ares!因为在创建新的goroutine的时候需要花费一些时间,而此时mian函数所在的goroutine是继续执行的。

sync.WaitGroup

Go语言中可以使用sync.WaitGroup来实现并发任务的同步。
sync.WaitGroup是一个结构体,传递的时候要传递指针。

方法名 功能
(wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

var wg sync.WaitGroup
func hello() {
    defer wg.Done()
    fmt.Println("Hello ares!")
}
func main() {
    wg.Add(1)
    go hello()
    fmt.Println("Hello BJ!")
    wg.Wait()
}

启动多个goroutine

var wg sync.WaitGroup
func hello(i int) {
    defer wg.Done()
    fmt.Println("Hello ares!",i)
}
func main() {
    for i:=0;i<10;i++{
        wg.Add(1)
        go hello(i)
    }
    wg.Wait()
}

每次打印的顺序不一样。这是因为10个goroutine是并发执行的,而goroutine的调度是随机的。

goroutine与线程

可增长的栈

OS线程(操作系统线程)一般都有固定的栈内存(通常为2MB),一个goroutine的栈在其生命周期开始时只有很小的栈(典型情况下2KB),goroutine的栈不是固定的,可以按需增大和缩小,goroutine的栈大小限制可以达到1GB,虽然极少会用到。所以在Go语言中一次创建十万左右的goroutine也是可以的。

goroutine调度

OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。goroutine的调度不需要切换内核语境,所以调用一个goroutine比调度一个线程成本低很多。

GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。
Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。
将任务分配到不同的CPU逻辑核心上实现并行示例:

func a() {
    for i:=0;i<10;i++{
        fmt.Println("A:",i)
    }
}

func b()  {
    for i:=0;i<10;i++{
        fmt.Println("B:",i)
    }
}

func main()  {
    runtime.GOMAXPROCS(1)
    #runtime.GOMAXPROCS(2) 使用两个cpu,此时两个任务并行执行
    go a()
    go b()
    time.Sleep(time.Second)
}

Go语言中的操作系统线程和goroutine的关系:

  • 一个操作系统线程对应用户态多个goroutine。
  • go程序可以同时使用多个操作系统线程。
  • goroutine和OS线程是多对多的关系,即m:n。

channel

go语言的并发模型是CSP,提倡通过通信共享内存而不是通过共享内存而实现通信。
channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

声明channel

格式:

var 变量 chan 元素类型
元素类型可以是任意类型

创建channel

通道是引用类型,通道类型的空值是nil。

var ch chan int
fmt.Println(ch) // <nil>

声明的通道后需要使用make函数初始化之后才能使用。 创建channel的格式如下:

make(chan 元素类型, [缓冲大小])

channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。
发送和接收都使用<-符号。

发送

ch <- 19 #将19发动到ch中

接受

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果

关闭

close(ch)

示例:

func main()  {
    var ch1 chan int
    var ch2 chan string
    fmt.Println(ch1)    //nil
    fmt.Println(ch2)    //nil
    ch3 := make(chan int,5)
    ch3 <- 10
    ret := <- ch3
    fmt.Println(ch3)    //0xc000096000
    fmt.Println(ret)    //10
    close(ch3)
    fmt.Println(ch3)    //0xc000096000
    fmt.Println(ret)    //10
}

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  • 对一个关闭的通道再发送值就会导致panic。
  • 对一个关闭的通道进行接收会一直获取值直到通道为空。
  • 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  • 关闭一个已经关闭的通道会导致panic。

无缓冲的通道

无缓冲的通道又称为阻塞的通道。无缓冲的通道必须有接收才能发送。
错误示范,无接收值的无缓冲通道:

func main() {
    ch := make(chan int )
    ch <- 19
    fmt.Println("succeed")
}

无接收值,可以编译,但无法执行,错误:fatal error: all goroutines are asleep - deadlock!代码会阻塞在ch <- 19这一行代码形成死锁。
可以使用goroutine去接收值来解决:

func recv(c chan int) {
    ret := <- c
    fmt.Println("succeed:",ret) //succeed: 19
}
func main() {
    ch := make(chan int )
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 19
    fmt.Println("succeed")  //succeed
}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。
使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

有缓冲通道

在使用make函数初始化通道的时候为其指定通道的容量,例如:

func main() {
    ch := make(chan int,10)
    ch <- 10
    fmt.Println("succeed:",ch)
    fmt.Println(len(ch),cap(ch))//1 10
}

只要通道的容量大于零,那么该通道就是有缓冲的通道,通道的容量表示通道中能存放元素的数量。
可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量。

优雅的从通道循环取值

当通道被关闭时,往该通道发送值会引发panic,从该通道里接收的值一直都是类型零值。通常使用的是for range的方式判断一个通道是否被关闭:

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    // 开启goroutine将0~100的数发送到ch1中
    go func() {
        for i :=0;i < 100;i++{
            ch1 <- i
        }
        close(ch1)
    }()
    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
    go func() {
        for {
            i,ok := <- ch1
            if !ok {
                break
            }
            ch2 <- i * i
        }
        close(ch2)
    }()
    // 在主goroutine中从ch2中接收值打印
    for i:= range ch2{
        fmt.Println(i)
    }
}

单向通道

有的时候我们会将通道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用通道都会对其进行限制,比如只能发送或只能接收。Go语言中提供了单向通道来处理这种情况。

func counter(out chan<- int) {
    for i :=0;i<10;i++{
        out <- i
    }
    close(out)
}
func squarer(out chan <- int,in <- chan int)  {
    for i := range in{
        out <- i* i
    }
    close(out)
}
func printer(in <- chan int)  {
    for i := range in{
        fmt.Println(i)
    }
}
func main()  {
    ch1 := make(chan int)
    ch2 := make(chan int)
    go counter(ch1)
    go squarer(ch2,ch1)
    printer(ch2)
}

chan<- int是一个只能发送的通道,可以发送但是不能接收;<-chan int是一个只能接收的通道,可以接收但是不能发送。在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

select多路复用

在某些场景下我们需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以接收将会发生阻塞。
Go内置了select关键字,可以同时响应多个通道的操作。select的使用类似于switch语句,它有一些列case分支和一个默认的分支。每个case会对应一个通道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。具体格式如下:

select{
    case <-ch1:
        ...
    case data := <-ch2:
        ...
    case ch3<-data:
        ...
    default:
        默认操作
}

示例:

func main() {
//声明一个存放int类型,容量为10的通道
    ch := make(chan int,10)
    for i:=0;i<10;i++{
        select {
        case x := <- ch:  //尝试从ch中接收值
            fmt.Println(x)
        case ch <- i: // 尝试向ch中发送数据
        }
    }
}

使用select语句能提高代码的可读性。如果多个case同时满足,select会随机选择一个。对于没有case的select{}会一直等待。

并发安全和锁

在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。

var x int64
var wg sync.WaitGroup

func add()  {
    for i:=0;i<10;i++{
        x += 1
    }
    wg.Done()
}
func main()  {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

上面的代码中开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i:=0;i<10;i++{
        lock.Lock() //加锁
        x += 1
        lock.Unlock()   //解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

读写锁在Go语言中使用sync包中的RWMutex类型。
读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

var(
    x   int64
    wg  sync.WaitGroup
    lock    sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    rwlock.Lock()   //加写锁
    x += 1
    time.Sleep(time.Millisecond * 10)   //操作耗时10ms
    rwlock.Unlock() //解锁
    wg.Done()
}

func read() {
    rwlock.RLock()  //加读锁
    time.Sleep(time.Millisecond)    //读操作耗时1ms
    rwlock.RUnlock()    //解锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i:=0;i<10;i++{
        wg.Add(1)
        go write()
    }
    for i:=0;i<1000;i++{
        wg.Add(1)
        go read()
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

读写锁非常适合读多写少的场景!

sync.Once

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

Go语言中内置的map不是并发安全的。

var m = make(map[string]int)

func get(key string) int {
    return m[key]
}
func set(key string,value int)  {
    m[key] = value
}
func main()  {
    wg := sync.WaitGroup{}
    for i := 0;i<20;i++{
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            set(key,n)
            fmt.Printf("k=:%v,v=%v\n",key,get(key))
            wg.Done()
        }(i)
    }
    wg.Wait()
}

上面的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes错误。
Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。

var m = sync.Map{}

func main()  {
    wg := sync.WaitGroup{}
    for i:=0;i<20;i++{
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key,n)
            value,_ := m.Load(key)
            fmt.Printf("k=:%v,v=%v\n",key,value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。
atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。
以后用到再补充!


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
[golang]在Go中处理时区发布时间:2022-07-10
下一篇:
go语言学习历程发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap