楔子
这次我们说一说 Go 的并发编程,并发可以说是 Go 语言的一个最大的卖点,因为它在语言层面上就支持并发,而且使用方式非常简单。
在早期,CPU 都是以单核的形式顺序执行机器指令。Go 语言的祖先 C 语言正是这种顺序编程语言的代表。顺序编程语言中的顺序是指:所有的指令都是以串行的方式执行,在相同的时刻有且仅有一个 CPU 在顺序执行程序的指令。
但随着处理器技术的发展,单核时代以提升处理器频率来提高运行效率的方式遇到了瓶颈,目前各种主流的 CPU 频率基本被锁定在了 3GHZ 附近。单核 CPU 的发展的停滞,给多核 CPU 的发展带来了机遇。相应地,编程语言也开始逐步向并行化的方向发展,而 Go 语言正是在多核和网络化的时代背景下诞生的原生支持并发的编程语言。
并发与并行
关于并发和并行的区别:
并行: 在一个时间点可以执行多个任务;
并发: 在一个时间段可以执行多个任务;
而受限于CPU处理器的核心数,显然不可能对所有任务都执行并行操作,所以我们经常听到的是"高并发",而不是"高并行"。因为"高并行"只能通过硬件方面实现,而"高并发"是可以通过代码层面解决的。
但是在编写高并发程序时,会经常遇见一些并发问题,比如:
1. 数据竞争:简单来说就是两个或多个线程同时读写某个变量,造成了预料之外的结果。
2. 保证原子性:在一个定义好的上下文里,原子性操作不可分割,上下文的定义非常重要。有些代码,你在程序里看起来是原子的,如最简单的 i++,但在机器层面看来,这条语句通常需要几条指令来完成(Load,Incr,Store),不是不可分割的,也就不是原子性的。原子性可以让我们放心地构造并发安全的程序。
3. 内存访问同步:代码中需要控制同时只有一个线程访问的区域称为临界区。Go 语言中一般使用 sync 包里的 Mutex 来完成同步访问控制。锁一般会带来比较大的性能开销,因此一般要考虑加锁的区域是否会频繁进入、锁的粒度如何控制等问题。
4. 死锁:在一个死锁的程序里,每个线程都在等待其他线程,形成了一个首尾相连的尴尬局面,程序无法继续运行下去。
5. 活锁:想象一下,你走在一条小路上,一个人迎面走来。你往左边走,想避开他;他做了相反的事情,他往右边走,结果两个都过不了。之后,两个人又都想从原来自己相反的方向走,还是同样的结果。这就是活锁,看起来都像在工作,但工作进度就是无法前进。
6. 饥饿:并发的线程不能获取它所需要的资源以进行下一步的工作。通常是有一个非常贪婪的线程,长时间占据资源不释放,导致其他线程无法获得资源。
常见的并行编程有多种模型,主要有多线程、消息传递等。从理论上来看,多线程和基于消息的并发编程是等价的。由于多线程并发模型可以自然对应到多核的处理器,主流的操作系统因此也都提供了系统级的多线程支持,同时从概念上讲多线程似乎也更直观,因此多线程编程模型逐步被吸纳到主流的编程语言特性或语言扩展库中。而主流编程语言对基于消息的并发编程模型支持则相比较少,Erlang 语言是支持基于消息传递并发编程模型的代表者,它的并发体之间不共享内存。Go语言是基于消息并发模型的集大成者,它将基于 CSP 模型的并发编程内置到了语言中,通过一个 Go 关键字就可以轻易地启动一个 goroutine,与 Erlang 不同的是 Go 语言的 goroutine 之间是共享内存的。
goroutine 和系统线程
goroutine 是 Go 语言特有的并发体,是一种轻量级的线程,由 go 关键字启动。在真实的 Go 语言的实现中,goroutine 和系统线程也不是等价的。尽管两者的区别实际上只是一个量的区别,但正是这个量变引发了 Go 语言并发编程质的飞跃。
首先,每个系统级线程都会有一个固定大小的栈(一般默认可能是 2MB),这个栈主要用来保存函数递归调用时参数和局部变量。固定了栈的大小导致了两个问题:一是对于很多只需要很小的栈空间的线程来说是一个巨大的浪费,二是对于少数需要巨大栈空间的线程来说又面临栈溢出的风险。针对这两个问题的解决方案是:要么降低固定的栈大小,提升空间的利用率;要么增大栈的大小以允许更深的函数递归调用,但这两者是没法同时兼得的。相反,一个 goroutine 会以一个很小的栈启动(可能是 2KB 或 4KB),当遇到深度递归导致当前栈空间不足时,goroutine 会根据需要动态地伸缩栈的大小(主流实现中栈的最大值可达到 1GB)。因为启动的代价很小,所以我们可以轻易地启动成千上万个 goroutine。
Go 的运行时还包含了其自己的调度器,这个调度器使用了一些技术手段,可以在 n 个操作系统线程上调度 m 个 goroutine。Go 调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的 Go 程序中的 goroutine。goroutine 采用的是半抢占式的协作调度,只有在当前 goroutine 发生阻塞时才会导致调度;同时发生在用户态,调度器会根据具体函数只保存必要的寄存器,切换的代价要比系统线程低得多。运行时有一个 runtime.GOMAXPROCS 函数,用于控制当前运行正常非阻塞 goroutine 的系统线程数目。
而且在 Go 语言中启动一个 goroutine 不仅和调用函数一样简单,而且 goroutine 之间调度代价也很低,这些因素极大地促进了并发编程的流行和发展。
goroutine的使用
下面我们来看看如何在 Go 中启动 goroutine。
package main
import (
"fmt"
)
func f(n int64) {
fmt.Println("f ->", n)
}
func main() {
// Go 语言中启动一个 goroutine 非常简单, 直接通过 go 关键字即可
// 注意: go 关键字后面必须跟一个函数才可以
go f(1)
go f(2)
go f(3)
// 上面三个函数都会启动一个单独的 goroutine 去执行
}
但是当我们执行这段程序时却发现什么都没有输出,原因是程序默认有一个主 goroutine,在启动三个子 goroutine 之后,程序就结束了。因此我们看到主 goroutine 是不会等待子 goroutine 的,如果想象成多线程的话,那么子 goroutine 就类似与守护线程。
因此我们可以让主线程强制等待一下:
package main
import (
"fmt"
"time"
)
func f(n int64) {
// 睡 n 秒
time.Sleep(time.Second * time.Duration(n))
fmt.Println("f ->", n)
}
func main() {
go f(3)
go f(1)
go f(2)
// 写一个死循环, 让主线程一直不结束
// 我们看到 go 里面死循环是不是特别简单呢? 因为 go 里面经常要用到死循环
for {}
/*
f -> 1
f -> 2
f -> 3
*/
}
通过程序输出结果,我们知道这几个函数在执行的时候都是并发执行的,谁先睡完,谁就先打印。三个 goroutine 执行完之后,总共花费了 3 秒钟,因为它们是由不同的 goroutine 执行的;如果不是通过 go 关键字启动,那么三个函数执行完之后,会总共花费六秒钟,而且是顺序打印的。
在使用 goroutine 的时候,经常会不注意调到坑里面:
package main
import (
"fmt"
)
func main() {
for i := 1; i <= 5; i++ {
go func() {
fmt.Printf("goroutine %d\n", i)
}()
}
for {}
/*
goroutine 6
goroutine 6
goroutine 6
goroutine 6
goroutine 6
*/
}
我们看到打印的结果和我们想象中的不一样啊,因为 goroutine 还没执行时,循环就结束了,此时i已经变成了 6;因此解决办法就是,在启动 goroutine 的时候指定好参数。
package main
import (
"fmt"
)
func main() {
for i := 1; i <= 5; i++ {
go func(i int) {
fmt.Printf("goroutine %d\n", i)
}(i)
}
for {}
/*
goroutine 2
goroutine 5
goroutine 1
goroutine 4
goroutine 3
*/
}
因为不知道哪个 goroutine 先执行完毕,所以打印的顺序是不固定的。
说完了 goroutine 的用法之后,我们应该讨论一下主 goroutine 和 子 goroutine 之间的同步关系了。
由于主 goroutine 不会等待子 goroutine,所以我们采用一个死循环,但这样做显然是不科学的;要是循环下面还有代码呢,我们希望某一段代码一定要等到当前子 goroutine 执行完之后才可以执行,这个时候使用死循环显然是不行的。如果采用 time.Sleep 的话显然也是不推荐的,因为不知道子 goroutine 什么时候执行完毕。这个时候,我们就需要通过"等待组"的方式来实现:
package main
import (
"fmt"
"sync"
)
func f(wg *sync.WaitGroup, n int) {
wg.Done()
fmt.Printf("goroutine %d\n", n)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
// "等待组" 内部有一个计数器, 调用 Add 方法让计数器增加, 增加的值等于传入的值
// 每启动一个 goroutine, 就将计数器加 1
wg.Add(1)
// 因为 "等待组" 本质是一个结构体, 所以需要传递指针, 不然就会拷贝一份
go f(&wg, i)
// 在函数f的内部, 我们调用了 wg.Done(), 这个方法会使得 "等待组" 内部的计数器减一
}
// 如果 "等待组" 内部的计数器不为 0, 那么调用 Wait() 会一直阻塞在这里; 如果为 0, 那么会顺利通行
wg.Wait()
fmt.Println("程序结束......")
/*
goroutine 5
goroutine 2
goroutine 1
goroutine 3
goroutine 4
程序结束......
*/
}
在使用 goroutine 的时候,一定要注意死锁的问题,当然在学习 Go 语言的并发编程时,要是没遇见过死锁只能说明你没有好好学。
package main
import (
"fmt"
"sync"
)
func f(wg *sync.WaitGroup, n int) {
fmt.Printf("goroutine %d\n", n)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go f(&wg, i)
}
wg.Wait()
fmt.Println("程序结束......")
/*
goroutine 5
goroutine 2
goroutine 1
goroutine 3
goroutine 4
fatal error: all goroutines are asleep - deadlock!
*/
}
我们看到在 goroutine 执行完之后,就造成了死锁,原因就是函数 f 中我们将 wg.Done() 这行代码给去掉了。当子 goroutine 结束之后,只剩下一个主 goroutine,而且 "等待组" 的计数器不为 0;因为 wg.Wait() 处于阻塞状态,所以就需要有其它的子 goroutine 将 "等待组" 的计数器减少到 0,所以但凡还有一个子 goroutine,都不会报错,而且程序也是等 5 个子 goroutine 都执行完之后才报的错。而子 goroutine 都执行完之后,只剩下主 goroutine 了,而且"等待组" 的计数器还不为 0,这个时候就出现死锁了。
goroutine 之间的调度
在 runtime 包中存在三个函数,可以对 goroutine 进行调度。
runtime.GOMAXPROCS(n):用来设置可以并行计算的 CPU 核数最大值,并返回之前的值。
runtime.Gosched():用于让出 CPU 时间片,让出当前 goroutine 的执行权限,调度器安排其它等待的任务运行,并在下次某个时候从该位置恢复执行。这就像跑接力赛,A 跑了一会碰到代码 runtime.Gosched() 就把接力棒交给 B 了,A 歇着了,B 继续跑。
runtime.Goexit(),调用此函数会立即使当前的 goroutine 的运行终止(终止协程),而其它的 goroutine 并不会受此影响。runtime.Goexit 在终止当前 goroutine 前会先执行此 goroutine 的还未执行的 defer 语句。另外注意千万别在主协程调用 runtime.Goexit,因为会引发 panic。
package main
import (
"fmt"
"runtime"
)
func main() {
// 默认是采用 CPU 的所有核心的, 我们可以查看 CPU 的核心数
fmt.Println(runtime.NumCPU()) // 12
// 设置使用的 CPU 核心数, 如果值小于 1, 则采用默认配置
// 返回 12, 说明默认是跑满了所有的核
fmt.Println(runtime.GOMAXPROCS(0)) // 12
}
然后我们看看在 Go 中,如何主动交出一个 goroutine 的执行权。
package main
import (
"fmt"
"runtime"
"time"
)
func f1(){
runtime.Gosched() // 此处将执行权让出
fmt.Println("f1 -> 1")
// 将控制权让出
runtime.Gosched()
fmt.Println("f1 -> 2")
}
func f2(){
// 这里会是第一个打印
fmt.Println("f2 -> 1")
// 再将控制权让出, 显然会执行f1的第一个打印语句
runtime.Gosched()
fmt.Println("f2 -> 2")
}
func main() {
go f1()
go f2()
time.Sleep(time.Second * 1)
/*
f2 -> 1
f1 -> 1
f2 -> 2
f1 -> 2
*/
// 打印的结果和我们想的是一样的, 首先打印: f2 -> 1, 然后交出执行权
// 再打印 f1 -> 1, 然后交出执行权打印 f2 -> 2, 最后打印 f1 -> 2
}
最后看看goroutine的主动退出:
package main
import (
"fmt"
"runtime"
"time"
)
func f1(){
fmt.Println("f1")
// 注意: 一旦出现了 Gosched 或者 Goexit
// 那么这个函数不能直接调用, 一定要出现在 goroutine 中
// 可以是在 main 函数中 go f1(), 也可以在其它函数中 go f1()
// 如果是直接通过 f1() 调用的话, 那么外层函数一定要通过 goroutine 的方式启动
runtime.Goexit()
}
func f2(){
f1()
fmt.Println("f2")
}
func f3(){
go f1()
fmt.Println("f3")
}
func main() {
// f2 里面调用了 f1, 但 f2 是通过goroutine启动的, 所以是合法的
// 并且会直接将整个 goroutine 停止掉, 因此 fmt.Println("f2") 没有执行
go f2()
time.Sleep(time.Second * 1)
/*
f1
*/
// f3 里面的 f1 函数是通过 goroutine 启动的, 因此 goroutine 里面又启动了一个goroutine
// 因此只会将退出自身的 goroutine, 所以 fmt.Println("f3") 会被执行
go f3()
time.Sleep(time.Second * 1)
/*
f1
f3
*/
// 依旧退出自身的 goroutine, 跟外层无关
f3()
time.Sleep(time.Second)
/*
f1
f3
*/
}
goroutine 中的 defer
我们看看 defer 在 goroutine 中是如何体现的。
package main
import (
"fmt"
)
func f1(){
defer fmt.Println("f1 defer")
fmt.Println("f1")
}
func main() {
// 首先执行 f1(), 因为是 f1() 不是 go f1(), 所以 fmt.Println("func") 一定会在 f1() 执行完毕之后才会执行
// 所以输出是什么很好分析
go func() {
defer fmt.Println("func defer")
f1()
fmt.Println("func")
}()
/*
f1
f1 defer
func
func defer
*/
//里面是 go f1(), 是 goroutine 里面启动一个 goroutine
//因此这是两个 goroutine, 它们之间是没有关系的
go func() {
defer fmt.Println("func defer")
go f1()
fmt.Println("func")
}()
/*
f1
func
func defer
f1 defer
*/
for{}
}
如果里面出现了 return 呢?
package main
import (
"fmt"
)
func f1(){
defer fmt.Println("f1 defer")
return
fmt.Println("f1")
}
func main() {
go func() {
defer fmt.Println("func defer")
f1()
fmt.Println("func")
}()
/*
f1 defer
func
func defer
*/
go func() {
defer fmt.Println("func defer")
go f1()
fmt.Println("func")
}()
/*
func
f1 defer
func defer
*/
for{}
}
结果很简单,肯定有人决定太简单了,没错下面重点来了,如果我们把 return 换一下呢?
package main
import (
"fmt"
"runtime"
)
func f1(){
defer fmt.Println("f1 defer")
runtime.Goexit() // goroutine退出
fmt.Println("f1")
}
func main() {
// 在 f1 中将整个 goroutine 都给终止掉了, 因为是同一个 goroutine, 因此两个fmt.Println 函数都不会执行
// 但是: 两个 defer 都会调用
go func() {
defer fmt.Println("func defer")
f1()
fmt.Println("func")
}()
/*
f1 defer
func defer
*/
// 这是两个独立的 goroutine, go f1() 对应的 goroutine 会被终止掉
// 但是匿名函数对应的 goroutine 则不会
go func() {
defer fmt.Println("func defer")
go f1()
fmt.Println("func")
}()
/*
func
func defer
f1 defer
*/
for{}
}
channel
可能有人已经发现了,我们虽然实现了并发的效果,但是当前的几个 goroutine 都是独立执行的。而且我们还没有办法接收函数的返回值,因为它是通过 goroutine 的方式启动的,我们不能这样做:res := go f() 、或者 fmt.Println(go f()) 。
那么问题来了,多个 goroutine 之间如何进行通信呢?答案是通过 channel。
什么是 channel
goroutine 和 channel 是 Go 语言并发编程的两大基石,goroutine 用于执行并发任务,channel 用于 goroutine 之间的同步、通信。
channel 在 gouroutine 间架起了一条管道,在管道里传输数据,实现 gouroutine 间的通信;由于它是线程安全的,所以用起来非常方便;channel 还提供 “先进先出” 的特性;它还能影响 goroutine 的阻塞和唤醒。
相信大家都见过一句话:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。这就是 Go 的并发哲学,它依赖 CSP 模型,基于 channel 实现。
package main
import (
"fmt"
"time"
)
func main() {
// channel(通道) 可以使用 make 函数创建,
// 比如: chan 是一个关键字, 表示这是一个通道, int 表示通道里面存放数据的类型
// 如果是 var ch chan int 这种方式, 那么这个 channel 是一个nil
ch := make(chan int)
go func() {
// <-ch, 表示从通道 ch 中将数据取出来
// 当然也可以用变量接收取出来的数据, data := <-ch
time.Sleep(time.Second)
fmt.Println("子goroutine")
<-ch
}()
// ch <- data, 表示将数据 data 写到通过通道 ch 里
ch <- 1
fmt.Println("主goroutine")
/*
子goroutine
主goroutine
*/
}
首先默认情况下使用 make 创建的通道是无缓冲的,无缓冲通道表示通道里面无法存放数据,往里面放数据放不进去,必须同时有人在通道的另一方接数据;而有缓冲的通道则可以存放数据,在将数据放进通道之后就可以离开了。
分析一下代码,首先 ch <- 1 表示主 goroutine 要将整数 1 放进通道ch中。但由于我们这里的通道是无缓冲的,或者说通道的容量为 0,导致主 goroutine 放不进去只能阻塞在这里。如果主 goroutine 想解除阻塞,那就必须有人同时往通道取数据,这样数据就会由通道的一方传递到另一方。
所以只有当子 goroutine 执行到 <-ch 的时候,主 goroutine 才会解除阻塞,这个时候数据会被子 goroutine 取走。如果我们将代码改一下:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
time.Sleep(time.Second)
fmt.Println("子goroutine")
//<-ch
}()
ch <- 1
fmt.Println("主goroutine")
/*
子goroutine
fatal error: all goroutines are asleep - deadlock!
*/
}
我们看到发生死锁了,只要程序中处于阻塞状态,并且只剩下一个主 goroutine,就会出现死锁。
有缓冲 channel 和无缓冲 channel
对 channel 的发送和接收操作都会在编译期间转换成为底层的发送接收函数。
channel 分为两种:带缓冲、不带缓冲。对不带缓冲的 channel 进行的操作实际上可以看作 "同步模式",带缓冲的则称为 "异步模式"。
同步模式下,发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输(实际上就是内存拷贝)。否则,任意一方先进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
异步模式下,在缓冲槽可用的情况下(有剩余容量),发送和接收操作都可以顺利进行。如果容量为 0(缓冲区满了),操作的一方在写入的时候同样会被挂起,直到出现接收操作才会被唤醒。
小结一下:同步模式下,必须要使发送方和接收方配对,操作才会成功,否则会被阻塞;异步模式下,缓冲槽要有剩余容量,操作才会成功,否则也会被阻塞。
那么我们来看看如何创建一个有缓冲的通道:
package main
import (
"fmt"
"time"
)
func main() {
// 创建一个容量为 3 的有缓冲通道
ch := make(chan int, 3)
// 这里都不会阻塞
ch <- 1
ch <- 2
ch <- 3
// 启动五个 goroutine
for i := 1; i < 5; i++ {
go func(i int) {
time.Sleep(time.Second * time.Duration(i))
fmt.Println("子goroutine", i)
<-ch
}(i)
}
// 缓冲区满了, 此时会阻塞,
ch <- 4
ch <- 5
// 如果想要执行到这里, 那么必须要有子 goroutine 从通道中取走两个数据
fmt.Println("主goroutine")
/*
子goroutine 1
子goroutine 2
主goroutine
*/
}
因此主 goroutine 不会等待所有的子 goroutine,它只是在等待有人从通道取走两个数据罢了,因此自由两个子 goroutine 打印了。
往通道放入数据,那么要求通道必须有空闲位置(缓冲区不能满)才可以;同理取数据也是如此,必须要求通道里面有数据才行。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 3)
ch <- 1
for i := 1; i < 5; i++ {
go func(i int) {
time.Sleep(time.Second * time.Duration(i))
fmt.Println("子goroutine", i)
ch <- i
}(i)
}
// 缓冲区没有数据, 所以此时会阻塞
<- ch
<- ch
// 如果想要执行到这里, 那么必须要有子 goroutine 从通道中放入数据
fmt.Println("主goroutine")
/*
子goroutine 1
主goroutine
*/
}
因为一开始通道中已经有一个数据了,所以主 goroutine 会在第二个 <- ch 处出现阻塞,直到有子 goroutine 往通道里面放入数据。
单向 channel
目前的 channel 我们既可以放入数据,也可以取出数据,我们可以让其变成单向的只能放入数据 或 只能取出数据的 channel。
package main
import (
"fmt"
"unsafe"
)
func main() {
// 从箭头的指向我们也能看出, 这是一个只能放入数据的 channel, 并且放入的是整型数据
// 可以 ch1 <- 1, 但是 <- ch报错
var ch1 = make(chan<- int, 3)
// 从箭头的指向我们也能看出, 这是一个只能取出数据的 channel, 并且取出的是整型数据
// 可以 <- ch, 但是 ch <- 1 报错
var ch2 = make(<-chan int, 3)
// channel 本质上是一个指针
fmt.Println(ch1, ch2) // 0xc0000d0000 0xc0000d0080
fmt.Println(unsafe.Sizeof(ch1)) // 8
}
channel 的关闭
channel 是可以关闭的,我们看看如何关闭一个 channel:
package main
import "fmt"
func main() {
var ch = make(chan int)
go func() {
for i:=0; i<5;i++{
ch <- i
}
// 调用 close 函数可以关闭一个 channel
// 只能关闭一次, 对一个已经 close 的 channel 不能再 close
close(ch)
}()
fmt.Println(<-ch) // 0
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
fmt.Println(<-ch) // 4
// 那么问题来了, 从一个关闭的 channel 里面获取值会发生什么呢?
fmt.Println(<-ch) // 0
fmt.Println(<-ch) // 0
fmt.Println(<-ch) // 0
// 我们看到读取得到是一个零值, 所以问题来了, 我要如何判断这个值到底是零值, 还是这个值本身就是0呢
// 还记得 map 我们是怎么做的吗? 没错, 使用两个变量接收即可
val, flag := <-ch
fmt.Println(val, flag) // 0 false
// 如果通道没有关闭, 那么 flag 是 true, 否则是 false
// 但是向一个关闭的通道写入值是会报错的
}
此外我们还可以使用 range 关键字去遍历获取 channel 中的值:
package main
import "fmt"
func main() {
var ch = make(chan int)
go func() {
for i:=0; i<5;i++{
ch <- i
}
close(ch)
}()
for val := range ch{
fmt.Println(val)
/*
0
1
2
3
4
*/
}
// 我们看到当通道关闭之后, range ch 的遍历就结束了
// 如果通道不结束, 那么不好意思, 这个循环会一直尝试获取
// 假设我们上面 close(ch) 被注释掉了, 那么子 goroutine 结束之后, 主 goroutine 还是会尝试从通道中获取值
// 但是此时程序中只剩下主 goroutine了, 因此会出现死锁
// 所以当 range 遍历一个 channel 时, 循环默认的结束条件是通道被关闭
}
另外,在通道内还有元素的情况下关闭通道,那么内部的元素依旧可以被读取。
func main() {
var ch = make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
// 关闭通道,此时通道内还有三个元素
close(ch)
v, ok := <- ch
// 我们看到之前的元素依旧可以被取出来
fmt.Println(v, ok) // 1 true
for v := range ch {
fmt.Print(v, " ") // 2 3
}
}
channel 搭配 select 关键字使用
如果有多个 channel,我们可以使用 select 进行监听,那么什么是 select 呢?
select 可以监听多个 channel 的写入或读取;
执行 select, 若只有一个 case 通过(不阻塞), 则执行这个 case 块;
若有多个 case 通过, 则随机挑选一个 case 执行;
若所有 case 均阻塞, 且定义了 default 块, 则执行 default 块; 没有定义 default 块, 那么 select 语句阻塞, 直到有 case 被唤醒;
使用 break 可以跳出 select;
package main
import (
"fmt"
)
func main() {
var ch1 = make(chan int)
var ch2 = make(chan string)
go func() {
for {
// select 类似于 switch, 一次只会执行一个分支
select {
case t := <-ch1:
fmt.Println(t)
case t := <-ch2:
fmt.Println(t)
}
}
}()
ch1 <- 123
ch2 <- "xxx"
for{}
/*
123
xxx
*/
}
我们可以通过 select 的方式,实现一个斐波那契数列。
package main
import (
"fmt"
)
func main() {
var ch1 = make(chan int)
var ch2 = make(chan bool)
go func() {
for {
// select类似于switch, 一次只会执行一个分支
select {
case t := <-ch1:
fmt.Println(t)
case ch2 <- true:
}
}
}()
for a, b, n := 1, 1, 1; n < 10; n++{
a, b = a + b, a
ch1 <- a
}
<- ch2
/*
2
3
5
8
13
21
34
55
89
*/
}
channel 实现定时器
我们可以用 channel 实现一个定时器的功能,不过需要搭配 time 模块使用:
package main
import (
"fmt"
"time"
)
func main() {
// 当我这样定义的时候, 两秒钟之后, time.C 里面就会有值
timer := time.NewTimer(time.Second * 2)
//如果没到 2 秒, 那么这里会卡住, 因为 timer.C 是一个 channel, 要 2s 后才会往里面写入值
//但是需要注意的是: 并不是我调用了 <-timer.C, 才开始写入, 而是当我调用 NewTimer 的时候就已经开始了
//如果我 sleep 两秒后, 再调用 <-timer.C, 那么会瞬间打印出值, 因为 2s 已经过去了, timer.C 里面已经有值了
//这里会打印当前时间
fmt.Println(<-timer.C) // 2019-08-28 23:20:51.9973729 +0800 CST m=+2.004240601
// 定时器的重置
// 这里写成 10 s,那么要等到 10 秒之后,time.C 里面才会有值
timer1 := time.NewTimer(time.Second * 10)
timer1.Reset(time.Second) // 这里改成一秒
// 那么 1s 后这里就会打印出值
fmt.Println(<-timer1.C) // 2019-08-28 23:20:53.0093997 +0800 CST m=+3.016267401
// 定时器的停止
timer2 := time.NewTimer(time.Second * 10)
timer2.Stop()
// fmt.Println(<-timer2.C) 一旦定时器停止,time2.C 这个通道也就关闭了,如果再试图从 timer2.C 里面读取数据,就会造成死锁。
}
除了 NewTimer 之外,还有一个 NewTicker,只不过前者是只获取一次,而后者可以循环获取。
package main
import (
"fmt"
"time"
)
func main() {
//除了 Timer,还有一个 Ticker。和 Timer 用法一样,只不过 Ticker 是循环获取,Timer 只获取一次
ticker := time.NewTicker(time.Second)
for i := 0; i < 5; i++ {
fmt.Printf("第%d次循环 %v\n", i+1, (<-ticker.C).Format("2006-1-2 15:4:5"))
/*
第1次循环 2019-8-28 23:26:14
第2次循环 2019-8-28 23:26:15
第3次循环 2019-8-28 23:26:16
第4次循环 2019-8-28 23:26:17
第5次循环 2019-8-28 23:26:18
*/
}
// 当然定时器也可以停止,但是无法重置,这里不再演示
}
控制并发数量
有时候我们需要定时执行几百个任务,但是并发数量又不能太高,假设是 3 吧,这个时候就可以通过 channel 控制并发数量。
var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
// …………
}
构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿 "许可证",拿到许可证之后,才能执行 w(),并且在执行完任务,要将 "许可证" 归还,这样就可以控制同时运行的 goroutine 数。
注意:这里的limit <- 1 放在 func 内部而不是外部,原因如下:
如果在外层, 就是控制系统 goroutine 的数量, 可能会阻塞 for 循环, 影响业务逻辑;
limit 其实和逻辑无关, 只是性能调优, 放在内层和外层的语义不太一样;
还有一点要注意的是,如果 w() 发生 panic,那 "许可证" 可能就还不回去了,因此需要使用 defer 来保证。
channel 小结
- channel 是一个引用类型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函数进行初始化。不初始化的话,为 nil 的 channel 无论收数据还是发数据都会阻塞。
- 向已经关闭的 channel 中写入数据会导致 panic,但是可以读取数据。如果关闭的通道中还有数据(在内部的数据还没有全部被取走的情况下,通道被关闭),那么会读取到通道内部的数据,并且返回值的第二个元素为 true。如果关闭的通道中没有数据了,那么会读取到零值,并且返回值的第二个元素为 false。
- 重复关闭 channel 或者关闭为 nil 的 channel 都会引发 panic。
- 同时监听多个 channel 或者避免阻塞在 channel 上,可以搭配 select 使用,select 会自动监听多个 channel。
- channel 还可以搭配定时器使用,实现定时任务的功能。
使用场景大致分为以下五种:
数据交流:当做并发地 buffer 或 queue,解决成产者-消费者问题,多个 goroutine 可以并发地生产和消费数据
数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的所有权托付出去(即通过通信来共享内存)
信号通知:一个 goroutine 可以将信号(closing、closed、data ready)传递给另一个或者另一组 goroutine
任务编排:可以让一组 goroutine 按照一定顺序并发或者串行的执行,这就是编排的功能
锁:利用 channel 也可以实现锁的机制,通过共享内存来通信便是借助于对共享数据进行加锁实现的,这也是传统的并发编程处理方式。所以从这里可以看出,channel 和 Lock、Cond 等基本的并发原语是有竞争关系的
练习:使用 Go 操作 ActiveMQ
使用 Go 连接 ActiveMQ 发送数据的话,需要使用一个叫做 stomp 的第三方包,直接 go get github.com/go-stomp/stomp 即可。
生产者:
package main
import (
"fmt"
"github.com/go-stomp/stomp"
"time"
)
func main(){
// 调用 Dial 方法,第一个参数是 "tcp",第二个参数则是 ip:port
// 返回 conn(连接) 和 err(错误)
conn, err:=stomp.Dial("tcp", "47.adsasaads89:61613")
// 错误判断
if err!=nil{
fmt.Println("err =", err)
return
}
// 发送十条数据
for i:=0;i<10;i++ {
// 调用 conn 下的 send 方法,接收三个参数
// 参数一:队列的名字
// 参数二:数据类型,一般是文本类型,直接写 text/plain 即可
// 参数三:内容,记住要转化成 byte 数组的格式
// 返回一个 error
err := conn.Send("testQ", "text/plain", []byte(fmt.Sprintf("message:%d", i)))
if err!=nil{
fmt.Println("err =", err)
}
}
/*
这里为什么要 sleep 一下,那就是 conn.Send 这个过程是不阻塞的
相当于 Send 把数据放到了一个 channel 里面
另一个 goroutine 从 channel 里面去取数据再放到消息队列里面
但是还没等到另一个 goroutine放入数据,此时循环已经结束了
因此最好要 sleep 一下,根据测试,如果不 sleep,那么发送 1000 条数据,
最终进入队列的大概是 980 条数据,这说明了什么
说明了当程序把 1000 条数据放到 channel 里面的时候,另一个 goroutine 只往队列里面放了 980 条
剩余的 20 条还没有来得及放入,程序就结束了
*/
time.Sleep(time.Second * 1)
}
消费者:
package main
import (
"fmt"
"github.com/go-stomp/stomp"
"time"
)
func recv_data(ch chan *stomp.Message) {
// 不断地循环,从 channel 里面获取数据
for {
v := <-ch
// 这里是打印当然还可以做其他的操作,比如写入 hdfs 平台
// v 是 *stomp.Message 类型,属性都在这里面
/*
type Message struct {
// Indicates whether an error was received on the subscription.
// The error will contain details of the error. If the server
// sent an ERROR frame, then the Body, ContentType and Header fields
// will be populated according to the contents of the ERROR frame.
Err error
// Destination the message has been sent to.
Destination string
// MIME content type.
ContentType string // MIME content
// Connection that the message was received on.
Conn *Conn
// Subscription associated with the message.
Subscription *Subscription
// Optional header entries. When received from the server,
// these are the header entries received with the message.
Header *frame.Header
// The ContentType indicates the format of this body.
Body []byte // Content of message
}
*/
fmt.Println(string(v.Body))
}
}
func main() {
// 创建一个 channel,存放的是 *stomp.Message 类型
ch := make(chan *stomp.Message)
// 将 channel 传入函数中
go recv_data(ch)
// 和生产者一样,调用 Dial 方法,返回 conn 和 err
conn, err := stomp.Dial("tcp", "47.dsdsadsa9:61613")
if err != nil {
fmt.Println("err =", err)
}
// 消费者订阅这个队列
// 参数一:队列名
// 参数二:确认信息,直接填默认地即可
sub, err := conn.Subscribe("testQ", stomp.AckMode(stomp.AckAuto))
for { //无限循环
select {
// sub.C 是一个 channel,如果订阅的队列有数据就读取
case v := <-sub.C:
// 读取的数据是一个 *stomp.Message 类型
ch <- v
// 如果 30 秒还没有人发数据的话,就结束
case <-time.After(time.Second * 30):
return
}
}
}
最终打印如下信息:
message:0
message:1
message:2
message:3
message:4
message:5
message:6
message:7
message:8
message:9
小结
这次我们聊了聊如何在go语言中使用并发,可以说并发是go语言的一大卖点。
|
请发表评论