1. channel的使用
channel,通道。golang中用于数据传递的一种数据结构。是golang中一种传递数据的方式,也可用作事件通知。
1.1 声明、传值、关闭
使用chan关键字声明一个通道,在使用前必须先创建,操作符 <- 用于指定通道的方向,发送或接收。如果未指定方向,则为双向通道。
1 //声明和创建
2 var ch chan int // 声明一个传递int类型的channel
3 ch := make(chan int) // 使用内置函数make()定义一个channel
4 ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式
5
6 type Equip struct{ /* 一些字段 */ }
7 ch2 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
8
9 //传值
10 ch <- value // 将一个数据value写入至channel,这会导致阻塞,直到有其他goroutine从这个channel中读取数据
11 value := <-ch // 从channel中读取数据,如果channel之前没有写入数据,也会导致阻塞,直到channel中被写入数据为止
12
13 ch := make(chan interface{}) // 创建一个空接口通道
14 ch <- 0 // 将0放入通道中
15 ch <- "hello" // 将hello字符串放入通道中
16
17 //关闭
18 close(ch) // 关闭channel
把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并报错:
fatal error: all goroutines are asleep - deadlock!
//运行时发现所有的 goroutine(包括main)都处于等待 goroutine。
1.2 四种重要的通道使用方式
无缓冲通道
通道默认是无缓冲的,无缓冲通道上的发送操作将会被阻塞,直到有其他goroutine从对应的通道上执行接收操作,数据传送完成,通道继续工作。
package main
import (
"fmt"
"time"
)
var done chan bool
func HelloWorld() {
fmt.Println("Hello world goroutine")
// time.Sleep(1*time.Second)
done <- true
}
func main() {
done = make(chan bool) // 创建一个channel
go HelloWorld()
<-done
}
1 //输出
2 //Hello world goroutine
由于main不会等goroutine执行结束才返回,所以可加sleep输出为了可以看到goroutine的输出内容,那么在这里由于是阻塞的,所以无需sleep。
将代码中”done <- true”和”<-done”,去掉再执行,没有上面的输出内容。
管道
通道可以用来连接goroutine,一边的输入是另一边输出。这就叫做管道:
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7 var echo chan string
8 var receive chan string
9
10 // 定义goroutine 1
11 func Echo() {
12 time.Sleep(1*time.Second)
13 echo <- "这是一次测试"
14 }
15
16 // 定义goroutine 2
17 func Receive() {
18 temp := <- echo // 阻塞等待echo的通道的返回
19 receive <- temp
20 }
21
22
23 func main() {
24 echo = make(chan string)
25 receive = make(chan string)
26
27 go Echo()
28 go Receive()
29
30 getStr := <-receive // 接收goroutine 2的返回
31
32 fmt.Println(getStr)
33 }
输出字符串:"这是一次测试"。
在这里不一定要去关闭channel,因为底层的垃圾回收机制会根据它是否可以访问来决定是否自动回收它。(这里不是根据channel是否关闭来决定的)
单向通道类型
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7
8 // 定义goroutine 1
9 func Echo(out chan<- string) { // 定义输出通道类型
10 time.Sleep(1*time.Second)
11 out <- "这又是一次测试"
12 close(out)
13 }
14
15 // 定义goroutine 2
16 func Receive(out chan<- string, in <-chan string) { // 定义输出通道类型和输入类型
17 temp := <-in // 阻塞等待echo的通道的返回
18 out <- temp
19 close(out)
20 }
21
22
23 func main() {
24 echo := make(chan string)
25 receive := make(chan string)
26
27 go Echo(echo)
28 go Receive(receive, echo)
29
30 getStr := <-receive // 接收goroutine 2的返回
31
32 fmt.Println(getStr)
33 }
输出:这又是一次测试。
缓冲管道
goroutine的通道默认是是阻塞的,那么有什么办法可以缓解阻塞? 答案是:加一个缓冲区。
创建一个缓冲通道:
1 ch := make(chan string, 3) // 创建了缓冲区为3的通道
2
3 //==
4 len(ch) // 长度计算
5 cap(ch) // 容量计算
缓冲通道传递数据示意图:
2. 内部结构
Go语言channel是first-class的,意味着它可以被存储到变量中,可以作为参数传递给函数,也可以作为函数的返回值返回。作为Go语言的核心特征之一,虽然channel看上去很高端,但是其实它仅仅就是一个数据结构而已,具体定义在 $GOROOT/src/runtime/chan.go 里。如下:
1 type hchan struct {
2 qcount uint // 队列中的总数据
3 dataqsiz uint // 循环队列的大小
4 buf unsafe.Pointer // 指向dataqsiz元素数组 指向环形队列
5 elemsize uint16 //
6 closed uint32
7 elemtype *_type // 元素类型
8 sendx uint // 发送索引
9 recvx uint // 接收索引
10 recvq waitq // 接待员名单, 因recv而阻塞的等待队列。
11 sendq waitq // 发送服务员列表, 因send而阻塞的等待队列。
12 //锁定保护hchan中的所有字段,以及几个在此通道上阻止的sudogs中的字段。
13 //按住此锁定时不要更改另一个G的状态(尤其是不要准备G),因为这可能会导致死锁堆栈缩小。
14 lock mutex
15 }
其核心是存放channel数据的环形队列,由qcount和elemsize分别指定了队列的容量和当前使用量。dataqsize是队列的大小。elemalg是元素操作的一个Alg结构体,记录下元素的操作,如copy函数,equal函数,hash函数等。
如果是带缓冲区的chan,则缓冲区数据实际上是紧接着Hchan结构体中分配的。不带缓冲的 channel ,环形队列 size 则为 0。
1 c = (Hchan*)runtime.mal(n + hint*elem->size);
另一重要部分是recvq和sendq两个双向链表,前者是等待读通道(<-channel)的goroutine队列,后者是等待写通道(channel <- xxx)的goroutine队列。若一个goroutine阻塞于channel了,那么它就被挂在recvq或sendq队列中。WaitQ是链表的定义,包含一个头结点和一个尾结点:
1 struct WaitQ
2 {
3 SudoG* first;
4 SudoG* last;
5 };
队列中的每个成员是一个SudoG结构体变量:
1 struct SudoG
2 {
3 G* g; // g和selgen构成
4 uint32 selgen; // 指向g的弱指针
5 SudoG* link;
6 int64 releasetime;
7 byte* elem; // 数据元素
8 };
SudoG里主要结构是一个g和一个elem。elem用于存储goroutine的数据。读通道时,数据会从Hchan的buf队列中拷贝到SudoG的elem域。写通道时,数据则是由SudoG的elem域拷贝到Hchan的队列中。
-
buf 是有缓冲的channel所特有的结构,用来存储缓存数据。是个循环链表
-
sendx 和recvx 用于记录buf 这个循环链表中的发送或者接收的index
-
lock 是个互斥锁。
从最基本的开始-创建channel
创建一个缓冲channel
1 ch := make(chan int, 3) //
在运行时库中会执行:
1 void runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
- 第一,加锁
- 第二,把数据从goroutine中copy到“队列”中(或者从队列中copy到goroutine中)。
- 第三,释放锁
当协程尝试从未关闭的 channel 中读取数据时,内部的操作如下:
- 当 buf 非空时,此时 recvq 必为空,buf 弹出一个元素给读协程,读协程获得数据后继续执行,此时若 sendq 非空,则从 sendq 中弹出一个写协程转入 running 状态,待写数据入队列 buf ,此时读取操作
<- ch 未阻塞;
- 当 buf 为空但 sendq 非空时(不带缓冲的 channel),则从 sendq 中弹出一个写协程转入 running 状态,待写数据直接传递给读协程,读协程继续执行,此时读取操作
<- ch 未阻塞;
- 当 buf 为空并且 sendq 也为空时,读协程入队列 recvq 并转入 blocking 状态,当后续有其他协程往 channel 写数据时,读协程才会重新转入 running 状态,此时读取操作
<- ch 阻塞。
类似的,当协程尝试往未关闭的 channel 中写入数据时,内部的操作如下:
- 当队列 recvq 非空时,此时队列 buf 必为空,从 recvq 弹出一个读协程接收待写数据,此读协程此时结束阻塞并转入 running 状态,写协程继续执行,此时写入操作
ch <- 未阻塞;
- 当队列 recvq 为空但 buf 未满时,此时 sendq 必为空,写协程的待写数据入 buf 然后继续执行,此时写入操作
ch <- 未阻塞;
- 当队列 recvq 为空并且 buf 为满时,此时写协程入队列 sendq 并转入 blokcing 状态,当后续有其他协程从 channel 中读数据时,写协程才会重新转入 running 状态,此时写入操作
ch <- 阻塞。
当关闭 non-nil channel 时,内部的操作如下:
- 当队列 recvq 非空时,此时 buf 必为空,recvq 中的所有协程都将收到对应类型的零值然后结束阻塞状态;
- 当队列 sendq 非空时,此时 buf 必为满,sendq 中的所有协程都会产生 panic ,在 buf 中数据仍然会保留直到被其他协程读取。
空通道是指将一个channel赋值为nil,或者定义后不调用make进行初始化。按照Go语言的语言规范,读写空通道是永远阻塞的。其实在函数runtime.chansend和runtime.chanrecv开头就有判断这类情况,如果发现参数c是空的,则直接将当前的goroutine放到等待队列,状态设置为waiting。
读一个关闭的通道,永远不会阻塞,会返回一个通道数据类型的零值。这个实现也很简单,将零值复制到调用函数的参数ep中。写一个关闭的通道,则会panic。关闭一个空通道,也会导致panic。
3. channel的高级用法
3.1 条件变量(condition variable)
类型于 POSIX 接口中线程通知其他线程某个事件发生的条件变量,channel 的特性也可以用来当成协程之间同步的条件变量。因为 channel 只是用来通知,所以 channel 中具体的数据类型和值并不重要,这种场景一般用 struct {} 作为 channel 的类型。
一对一通知
类似 pthread_cond_signal() 的功能,用来在一个协程中通知另个某一个协程事件发生:
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7
8 func main() {
9 ch := make(chan struct{})
10 nums := make([]int, 100)
11
12 go func() {
13 time.Sleep(time.Second)
14 for i := 0; i < len(nums); i++ {
15 nums[i] = i
16 }
17 // send a finish signal
18 ch <- struct{}{}
19 }()
20
21 // wait for finish signal
22 <-ch
23 fmt.Println(nums)
24 }
广播通知
类似 pthread_cond_broadcast() 的功能。利用从已关闭的 channel 读取数据时总是非阻塞的特性,可以实现在一个协程中向其他多个协程广播某个事件发生的通知:
1 package main
2
3 import (
4 "fmt"
5 "time"
6 )
7
8 func main() {
9 N := 10
10 exit := make(chan struct{})
11 done := make(chan struct{}, N)
12
13 // start N worker goroutines
14 for i := 0; i < N; i++ {
15 go func(n int) {
16 for {
17 select {
18 // wait for exit signal
19 case <-exit:
20 fmt.Printf("worker goroutine #%d exit\n", n)
21 done <- struct{}{}
22 return
23 case <-time.After(time.Second):
24 fmt.Printf("worker goroutine #%d is working...\n", n)
25 }
26 }
27 }(i)
28 }
29
30 time.Sleep(3 * time.Second)
31 // broadcast exit signal
32 close(exit)
33 // wait for all worker goroutines exit
34 for i := 0; i < N; i++ {
35 <-done
36 }
37 fmt.Println("main goroutine exit")
38 }
3.2 信号量
channel 的读/写相当于信号量的 P / V 操作,下面的示例程序中 channel 相当于信号量:
1 package main
2
3 import (
4 "log"
5 "math/rand"
6 "time"
7 )
8
9 type Seat int
10 type Bar chan Seat
11
12 func (bar Bar) ServeConsumer(customerId int) {
13 log.Print("-> consumer#", customerId, " enters the bar")
14 seat := <-bar // need a seat to drink
15 log.Print("consumer#", customerId, " drinks at seat#", seat)
16 time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
17 log.Print("<- consumer#", customerId, " frees seat#", seat)
18 bar <- seat // free the seat and leave the bar
19 }
20
21 func main() {
22 rand.Seed(time.Now().UnixNano())
23
24 bar24x7 := make(Bar, 10) // the bar has 10 seats
25 // Place seats in an bar.
26 for seatId := 0; seatId < cap(bar24x7); seatId++ {
27 bar24x7 <- Seat(seatId) // none of the sends will block
28 }
29
30 // a new consumer try to enter the bar for each second
31 for customerId := 0; ; customerId++ {
32 time.Sleep(time.Second)
33 go bar24x7.ServeConsumer(customerId)
34 }
35 }
3.3 互斥量
互斥量相当于二元信号里,所以 cap 为 1 的 channel 可以当成互斥量使用:
1 package main
2
3 import "fmt"
4
5 func main() {
6 mutex := make(chan struct{}, 1) // the capacity must be one
7
8 counter := 0
9 increase := func() {
10 mutex <- struct{}{} // lock
11 counter++
12 <-mutex // unlock
13 }
14
15 increase1000 := func(done chan<- struct{}) {
16 for i := 0; i < 1000; i++ {
17 increase()
18 }
19 done <- struct{}{}
20 }
21
22 done := make(chan struct{})
23 go increase1000(done)
24 go increase1000(done)
25 <-done; <-done
26 fmt.Println(counter) // 2000
27 }
4. 关闭 channel
关闭不再需要使用的 channel 并不是必须的。跟其他资源比如打开的文件、socket 连接不一样,这类资源使用完后不关闭后会造成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然可以用其他不太优雅的方式自己实现一个:
1 func isClosed(ch chan int) bool {
2 select {
3 case <-ch:
4 return true
5 default:
6 }
7 return false
8 }
不过实现一个这样的接口也没什么必要。因为就算通过 isClosed() 得到当前 channel 当前还未关闭,如果试图往 channel 里写数据,仍然可能会发生 panic ,因为在调用 isClosed() 后,其他协程可能已经把 channel 关闭了。 关闭 channel 时应该注意以下准则:
- 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
- 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
- 如果只有一个写入端,可以在这个写入端放心关闭 channel 。
关闭 channel 粗暴一点的做法是随意关闭,如果产生了 panic 就用 recover 避免进程挂掉。稍好一点的方案是使用标准库的 sync 包来做关闭 channel 时的协程同步,不过使用起来也稍微复杂些。下面介绍一种优雅些的做法。
4.1 一写多读
这种场景下这个唯一的写入端可以关闭 channel 用来通知读取端所有数据都已经写入完成了。读取端只需要用 for range 把 channel 中数据遍历完就可以了,当 channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 )
7
8 func main() {
9 wg := &sync.WaitGroup{}
10 ch := make(chan int, 100)
11
12 send := func() {
13 for i := 0; i < 100; i++ {
14 ch <- i
15 }
16 // signal sending finish
17 close(ch)
18 }
19
20 recv := func(id int) {
21 defer wg.Done()
22 for i := range ch {
23 fmt.Printf("receiver #%d get %d\n", id, i)
24 }
25 fmt.Printf("receiver #%d exit\n", id)
26 }
27
28 wg.Add(3)
29 go recv(0)
30 go recv(1)
31 go recv(2)
32 send()
33
34 wg.Wait()
35 }
4.2 多写一读
这种场景下虽然可以用 sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 )
7
8 func main() {
9 wg := &sync.WaitGroup{}
10 ch := make(chan int, 100)
11 done := make(chan struct{})
12
13 send := func(id int) {
14 defer wg.Done()
15 for i := 0; ; i++ {
16 select {
17 case <-done:
18 // get exit signal
19 fmt.Printf("sender #%d exit\n", id)
20 return
21 case ch <- id*1000 + i:
22 }
23 }
24 }
25
26 recv := func() {
27 count := 0
28 for i := range ch {
29 fmt.Printf("receiver get %d\n", i)
30 count++
31 if count >= 1000 {
32 // signal recving finish
33 close(done)
34 return
35 }
36 }
37 }
38
39 wg.Add(3)
40 go send(0)
41 go send(1)
42 go send(2)
43 recv()
44
45 wg.Wait()
46 }
4.2 多写多读
这种场景稍微复杂,和上面的例子一样,也需要设置一个额外 channel 用来通知多个写入端和读取端。另外需要起一个额外的协程来通过关闭这个 channel 来广播通知:
1 package main
2
3 import (
4 "fmt"
5 "sync"
6 "time"
7 )
8
9 func main() {
10 wg := &sync.WaitGroup{}
11 ch := make(chan int, 100)
12 done := make(chan struct{})
13
14 send := func(id int) {
15 defer wg.Done()
16 for i := 0; ; i++ {
17 select {
18 case <-done:
19 // get exit signal
20 fmt.Printf("sender #%d exit\n", id)
21 return
22 case ch <- id*1000 + i:
23 }
24 }
25 }
26
27 recv := func(id int) {
28 defer wg.Done()
29 for {
30 select {
31 case <-done:
32 // get exit signal
33 fmt.Printf("receiver #%d exit\n", id)
34 return
35 case i := <-ch:
36 fmt.Printf("receiver #%d get %d\n", id, i)
37 time.Sleep(time.Millisecond)
38 }
39 }
40 }
41
42 wg.Add(6)
43 go send(0)
44 go send(1)
45 go send(2)
46 go recv(0)
47 go recv(1)
48 go recv(2)
49
50 time.Sleep(time.Second)
51 // signal finish
52 close(done)
53 // wait all sender and receiver exit
54 wg.Wait()
55 }
channle 作为 golang 最重要的特性,用起来还是比较方便的。传统的 C 里要实现类似的功能的话,一般需要用到 socket 或者 FIFO 来实现,另外还要考虑数据包的完整性与并发冲突的问题,channel 则屏蔽了这些底层细节,使用者只需要考虑读写就可以了。 channel 是引用类型,了解一下 channel 底层的机制对更好的使用 channel 还是很用必要的。虽然操作原语简单,但涉及到阻塞的问题,使用不当可能会造成死锁或者无限制的协程创建最终导致进程挂掉。
|
请发表评论