在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
通道(channel) 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。虽然可以使用共享内存进行数据交换,但是共享内存在不同的goroutine中容易发生竞态问题。为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。 Go语言提倡使用通信的方法代替共享内存,这里通信的方法就是使用通道(channel),如图1-1所示所示。 图1-1 goroutine与channel的通信 通道的特性 Go 语言中的通道(channel)是一种特殊的类型。在任何时候,同时只能有一个 goroutine 访问通道进行发送和获取数据。goroutine 间通过通道就可以通信。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。 声明通道类型 通道本身需要一个类型进行修饰,就像切片类型需要标识元素类型。通道的元素类型就是在其内部传输的数据类型,声明如下: var 通道变量 chan 通道类型
chan 类型的空值是 nil,声明后需要配合 make 后才能使用。 创建通道 通道是引用类型,需要使用 make 进行创建,格式如下: 通道实例 := make(chan 数据类型)
例如: ch1 := make(chan int) // 创建一个整型类型的通道 ch2 := make(chan interface{}) // 创建一个空接口类型的通道, 可以存放任意格式 type Equip struct{ /* 一些字段 */ } ch2 := make(chan *Equip) // 创建Equip指针类型的通道, 可以存放*Equip
使用通道发送数据 通道创建后,就可以使用通道进行发送和接收操作。 1.通道发送数据的格式
2.通过通道发送数据的例子 使用 make 创建一个通道后,就可以使用<-向通道发送数据,代码如下: // 创建一个空接口通道 ch := make(chan interface{}) // 将0放入通道中 ch <- 0 // 将hello字符串放入通道中 ch <- "hello"
3.发送将持续阻塞直到数据被接收 把数据往通道中发送时,如果接收方一直都没有接收,那么发送操作将持续阻塞。Go 程序运行时能智能地发现一些永远无法发送成功的语句并做出提示,代码如下: package main func main() { // 创建一个整型通道 ch := make(chan int) // 尝试将0通过通道发送 ch <- 0 }
运行代码,报错: fatal error: all goroutines are asleep - deadlock!
报错的意思是:运行时发现所有的goroutine(包括main)都处于等待goroutine。也就是说所goroutine中的channel并没有形成发送和接收对应的代码。 使用通道接收数据 通道接收同样使用<-操作符,通道接收有如下特性:
通道的数据接收一共有以下 4 种写法。 1.阻塞接收数据 阻塞模式接收数据时,将接收变量作为<-操作符的左值,格式如下: data := <-ch
执行该语句时将会阻塞,直到接收到数据并赋值给 data 变量。 2.非阻塞接收数据 使用非阻塞方式从通道接收数据时,语句不会发生阻塞,格式如下: data, ok := <-ch
data:表示接收到的数据。未接收到数据时,data 为通道类型的零值。 非阻塞的通道接收方法可能造成高的 CPU 占用,因此使用非常少。如果需要实现接收超时检测,可以配合 select 和计时器 channel 进行,后面还会再介绍。 3.接收任意数据,忽略接收的数据 阻塞接收数据后,忽略从通道返回的数据,格式如下: <-ch
执行该语句时将会发生阻塞,直到接收到数据,但接收到的数据会被忽略。这个方式实际上只是通过通道在 goroutine 间阻塞收发实现并发同步。 使用通道做并发同步的写法,可以参考下面的例子: package main import ( "fmt" ) func main() { // 构建一个通道 ch := make(chan int) // 开启一个并发匿名函数 go func() { fmt.Println("start goroutine") // 通过通道通知main的goroutine ch <- 0 fmt.Println("exit goroutine") }() fmt.Println("wait goroutine") // 等待匿名goroutine <-ch fmt.Println("all done") }
代码说明如下:
执行代码,输出如下: wait goroutine start goroutine exit goroutine all done
4.循环接收 for data := range ch { }
通道ch 是可以进行遍历的,遍历的结果就是接收到的数据。数据类型就是通道的数据类型。通过for遍历获得的变量只有一个,即上面例子中的data。 遍历通道数据的例子请参考下面的代码。 使用 for 从通道中接收数据: package main import ( "fmt" "time" ) func main() { // 构建一个通道 ch := make(chan int) // 开启一个并发匿名函数 go func() { // 从3循环到0 for i := 3; i >= 0; i-- { // 发送3到0之间的数值 ch <- i // 每次发送完时等待 time.Sleep(time.Second) } }() // 遍历接收通道数据 for data := range ch { // 打印通道数据 fmt.Println(data) // 当遇到数据0时, 退出接收循环 if data == 0 { break } } }
代码说明如下:
执行代码,输出如下: 3 2 1 0
并发打印 上面的例子创建的都是无缓冲通道。使用无缓冲通道往里面装入数据时,装入方将被阻塞,直到另外通道在另外一个goroutine中被取出。同样,如果通道中没有放入任何数据,接收方试图从通道中获取数据时,同样也是阻塞。发送和接收的操作是同步完成的。 下面通过一个并发打印的例子,将goroutine和channel放在一起展示它们的用法。 package main import ( "fmt" ) func printer(c chan int) { // 开始无限循环等待数据 for { // 从channel中获取一个数据 data := <-c // 将0视为数据结束 if data == 0 { break } // 打印数据 fmt.Println(data) } // 通知main已经结束循环(我搞定了!) c <- 0 } func main() { // 创建一个channel c := make(chan int) // 并发执行printer, 传入channel go printer(c) for i := 1; i <= 10; i++ { // 将数据通过channel投送给printer c <- i } // 通知并发的printer结束循环(没数据啦!) c <- 0 // 等待printer结束(搞定喊我!) <-c }
代码说明如下:
代码说明如下: 1 2 3 4 5 6 7 8 9 10
本例的设计模式就是典型的生产者和消费者。生产者是第37行的循环,而消费者是printer()函数。整个例子使用了两个goroutine,一个是main(),一个是通过第35行printer()函数创建的goroutine。两个goroutine通过第32行创建的通道进行通信。这个通道有下面两重功能。 数据传送:第40行中发送数据和第13行接收数据。 控制指令:类似于信号量的功能。同步goroutine的操作。功能简单描述为:
单向通道 Go的通道可以在声明时约束其操作方向,如只发送或只接收。这种被约束方向的通道被称作单向通道。 1.单向通道的声明格式 只能发送的通道类型为chan<-,只能接收的通道类型为<-chan,格式如下: var 通道实例 chan<- 元素类型 // 只能发送通道 var 通道实例 <-chan 元素类型 // 只能接收通道
2.单向通道的使用例子 示例代码如下: ch := make(chan int) // 声明一个只能发送的通道类型, 并赋值为ch var chSendOnly chan<- int = ch //声明一个只能接收的通道类型, 并赋值为ch var chRecvOnly <-chan int = ch
上面的例子中,chSendOnly只能发送数据,如果尝试接收数据,将会出现如下报错: invalid operation: <-chSendOnly (receive from send-only type chan<- int)
同理,chRecvOnly也是不能发送的。当然,使用make创建通道时,也可以创建一个只发送或只读取的通道: ch := make(<-chan int) var chReadOnly <-chan int = ch <-chReadOnly
上面代码编译正常,运行也是正确的。但是,一个不能填充数据(发送)只能读取的通道是毫无意义的。 time包中的单向通道 time包中的计时器会返回一个timer实例,代码如下: timer := time.NewTimer(time.Second)
timer的Timer类型定义如下: type Timer struct { C <-chan Time r runtimeTimer }
第2行中C通道的类型就是一种只能接收的单向通道。如果此处不进行通道方向约束,一旦外部向通道发送数据,将会造成其他使用到计时器的地方逻辑产生混乱。因此,单向通道有利于代码接口的严谨性。 Go语言带缓冲的通道 在无缓冲通道的基础上,为通道增加一个有限大小的存储空间形成带缓冲通道。带缓冲通道在发送时无需等待接收方接收即可完成发送过程,并且不会发生阻塞,只有当存储空间满时才会发生阻塞。同理,如果缓冲通道中有数据,接收时将不会发生阻塞,直到通道中没有数据可读时,通道将会再度阻塞。 无缓冲通道保证收发过程同步。无缓冲收发过程类似于快递员给你电话让你下楼取快递,整个递交快递的过程是同步发生的,你和快递员不见不散。但这样做快递员就必须等待所有人下楼完成操作后才能完成所有投递工作。如果快递员将快递放入快递柜中,并通知用户来取,快递员和用户就成了异步收发过程,效率可以有明显的提升。带缓冲的通道就是这样的一个“快递柜”。 1.创建带缓冲通道 如何创建带缓冲的通道呢?参见如下代码: 通道实例 := make(chan 通道类型, 缓冲大小)
下面通过一个例子中来理解带缓冲通道的用法,参见下面的代码: package main import "fmt" func main() { // 创建一个3个元素缓冲大小的整型通道 ch := make(chan int, 3) // 查看当前通道的大小 fmt.Println(len(ch)) // 发送3个整型元素到通道 ch <- 1 ch <- 2 ch <- 3 // 查看当前通道的大小 fmt.Println(len(ch)) }
代码说明如下:
代码输出如下: 0 3
2.阻塞条件 带缓冲通道在很多特性上和无缓冲通道是类似的。无缓冲通道可以看作是长度永远为0的带缓冲通道。因此根据这个特性,带缓冲通道在下面列举的情况下依然会发生阻塞:
为什么Go语言对通道要限制长度而不提供无限长度的通道? Go语言通道的多路复用 多路复用是通信和网络中的一个专业术语。多路复用通常表示在一个信道上传输多路信号或数据流的过程和技术。 提示:报话机同一时刻只能有一边进行收或者发的单边通信,报话机需要遵守的通信流程如下:
电话可以在说话的同时听到对方说话,所以电话是一种多路复用的设备,一条通信线路上可以同时接收或者发送数据。同样的,网线、光纤也都是基于多路复用模式来设计的,网线、光纤不仅可支持同时收发数据,还支持多个人同时收发数据。 在使用通道时,想同时接收多个通道的数据是一件困难的事情。通道在接收数据时,如果没有数据可以接收将会发生阻塞。虽然可以使用如下模式进行遍历,但运行性能会非常差。 for{ // 尝试接收ch1通道 data, ok := <-ch1 // 尝试接收ch2通道 data, ok := <-ch2 // 接收后续通道 … }
Go语言中提供了select关键字,可以同时响应多个通道的操作。select的每个case都会对应一个通道的收发过程。当收发完成时,就会触发case中响应的语句。多个操作在每次select中挑选一个进行响应。格式如下: select{ case 操作1: 响应操作1 case 操作2: 响应操作2 … default: 没有操作情况 }
操作1、操作2:包含通道收发语句,请参考表1-1:
响应操作1、响应操作2:当操作发生时,会执行对应 case 的响应操作。default:当没有任何操作时,默认执行 default 中的语句。 Go语言RPC 服务器开发中会使用RPC(Remote Procedure Call,远程过程调用)简化进程间通信的过程。RPC 能有效地封装通信过程,让远程的数据收发通信过程看起来就像本地的函数调用一样。 本例中,使用通道代替socket实现RPC的过程。客户端与服务器运行在同一个进程,服务器和客户端在两个goroutine中运行。 1.客户端请求和接收封装 下面的代码封装了向服务器请求数据,等待服务器返回数据,如果请求方超时,该函数还会处理超时逻辑。 // 模拟RPC客户端的请求和接收消息封装 func RPCClient(ch chan string, req string) (string, error) { // 向服务器发送请求 ch <- req // 等待服务器返回 select { case ack := <-ch: // 接收到服务器返回数据 return ack, nil case <-time.After(time.Second): // 超时 return "", errors.New("Time out") } }
代码说明如下:
RPCClient()函数中,执行到select语句时,第9行和第11行的通道操作会同时开启。如果第9行的通道先返回,则执行第10行逻辑,表示正常接收到服务器数据;如果第11行的通道先返回,则执行第12行的逻辑,表示请求超时,返回错误。 2.服务器接收和反馈数据 // 模拟RPC服务器端接收客户端请求和回应 func RPCServer(ch chan string) { for { // 接收客户端请求 data := <-ch // 打印接收到的数据 fmt.Println("server received:", data) //向客户端反馈已收到 ch <- "roger" } }
代码说明如下:
运行整个程序,客户端可以正确收到服务器返回的数据,客户端RPCClient()函数的代码按下面代码中第三行分支执行。 // 等待服务器返回 select { case ack := <-ch: // 接收到服务器返回数据 return ack, nil case <-time.After(time.Second): // 超时 return "", errors.New("Time out") }
程序输出如下: server received: hi client received roger
3.模拟超时 上面的例子虽然有客户端超时处理,但是永远不会触发,因为服务器的处理速度很快,也没有真正的网络延时或者“服务器宕机”的情况。因此,为了展示select中超时的处理,在服务器逻辑中增加一条语句,故意让服务器延时处理一段时间,造成客户端请求超时,代码如下: // 模拟RPC服务器端接收客户端请求和回应 func RPCServer(ch chan string) { for { // 接收客户端请求 data := <-ch // 打印接收到的数据 fmt.Println("server received:", data) // 通过睡眠函数让程序执行阻塞2秒的任务 time.Sleep(time.Second * 2) // 反馈给客户端收到 ch <- "roger" } }
第11行中,time.Sleep()函数会让goroutine执行暂停2秒。使用这种方法模拟服务器延时,造成客户端超时。客户端处理超时1秒时通道就会返回: // 等待服务器返回 select { case ack := <-ch: // 接收到服务器返回数据 return ack, nil case <-time.After(time.Second): // 超时 return "", errors.New("Time out") }
4.主流程 主流程中会创建一个无缓冲的字符串格式通道。将通道传给服务器的RPCServer()函数,这个函数并发执行。使用RPCClient()函数通过ch对服务器发出RPC请求,同时接收服务器反馈数据或者等待超时。参考下面代码: func main() { // 创建一个无缓冲字符串通道 ch := make(chan string) // 并发执行服务器逻辑 go RPCServer(ch) // 客户端请求数据和接收数据 recv, err := RPCClient(ch, "hi") if err != nil { // 发生错误打印 fmt.Println(err) } else { // 正常接收到数据 fmt.Println("client received", recv) } }
代码说明如下:
完成代码: package main import ( "errors" "fmt" "time" ) // 模拟RPC客户端的请求和接收消息封装 func RPCClient(ch chan string, req string) (string, error) { // 向服务器发送请求 ch <- req // 等待服务器返回 select { case ack := <-ch: // 接收到服务器返回数据 return ack, nil case <-time.After(time.Second): // 超时 return "", errors.New("Time out") } } // 模拟RPC服务器端接收客户端请求和回应 func RPCServer(ch chan string) { for { // 接收客户端请求 data := <-ch // 打印接收到的数据 fmt.Println("server received:", data) // 反馈给客户端收到 ch <- "roger" } } func main() { // 创建一个无缓冲字符串通道 ch := make(chan string) // 并发执行服务器逻辑 go RPCServer(ch) // 客户端请求数据和接收数据 recv, err := RPCClient(ch, "hi") if err != nil { // 发生错误打印 fmt.Println(err) } else { // 正常接收到数据 fmt.Println("client received", recv) } }
使用通道响应计时器的事件 Go语言中的time包提供了计时器的封装。由于Go语言中的通道和goroutine的设计,定时任务可以在goroutine中通过同步的方式完成,也可以通过在goroutine中异步回调完成。这里将分两种用法进行例子展示。 1.一段时间之后(time.After) package main import ( "fmt" "time" ) func main() { // 声明一个退出用的通道 exit := make(chan int) // 打印开始 fmt.Println("start") // 过1秒后, 调用匿名函数 time.AfterFunc(time.Second, func() { // 1秒后, 打印结果 fmt.Println("one second after") // 通知main()的goroutine已经结束 exit <- 0 }) // 等待结束 <-exit }
代码说明如下:
time.AfterFunc()函数是在time.After基础上增加了到时的回调,方便使用。而time.After()函数又是在time.NewTimer()函数上进行的封装,下面的例子展示如何使用timer.NewTimer()和time.NewTicker()。 2.定点计时 计时器(Timer)的原理和倒计时闹钟类似,都是给定多少时间后触发。打点器(Ticker)的原理和钟表类似,钟表每到整点就会触发。这两种方法创建后会返回time.Ticker对象和time.Timer对象,里面通过一个C成员,类型是只能接收的时间通道(<-chanTime),使用这个通道就可以获得时间触发的通知。 下面代码创建一个打点器,每500毫秒触发一起;创建一个计时器,2秒后触发,只触发一次。 package main import ( "fmt" "time" ) func main() { // 创建一个打点器, 每500毫秒触发一次 ticker := time.NewTicker(time.Millisecond * 500) // 创建一个计时器, 2秒后触发 stopper := time.NewTimer(time.Second * 2) // 声明计数变量 var i int // 不断地检查通道情况 for { // 多路复用通道 select { case <-stopper.C: // 计时器到时了 fmt.Println("stop") // 跳出循环 goto StopHere case <-ticker.C: // 打点器触发了 // 记录触发了多少次 i++ fmt.Println("tick", i) } } // 退出的标签, 使用goto跳转 StopHere: fmt.Println("done") }
代码说明如下:
关闭通道后继续使用通道 通道是一个引用对象,和map类似。map在没有任何外部引用时,Go程序在运行时(runtime)会自动对内存进行垃圾回收(GarbageCollection,GC)。类似的,通道也可以被垃圾回收,但是通道也可以被主动关闭。 1.格式 使用 close() 来关闭一个通道: close(ch)
关闭的通道依然可以被访问,访问被关闭的通道将会发生一些问题。 2.给被关闭通道发送数据将会触发panic 被关闭的通道不会被置为 nil。如果尝试对已经关闭的通道进行发送,将会触发宕机,代码如下: package main import "fmt" func main() { // 创建一个整型的通道 ch := make(chan int) // 关闭通道 close(ch) // 打印通道的指针, 容量和长度 fmt.Printf("ptr:%p cap:%d len:%d\n", ch, cap(ch), len(ch)) // 给关闭的通道发送数据 ch <- 1 }
代码说明如下:
代码运行后触发宕机: ptr:0xc042052060 cap:0 len:0 panic: send on closed channel
提示触发宕机的原因是给一个已经关闭的通道发送数据。 3.从已关闭的通道接收数据时将不会发生阻塞 从已经关闭的通道接收数据或者正在接收数据时,将会接收到通道类型的零值,然后停止阻塞并返回。 操作关闭后的通道: package main import "fmt" func main() { // 创建一个整型带两个缓冲的通道 ch := make(chan int, 2) // 给通道放入两个数据 ch <- 0 ch <- 1 // 关闭缓冲 close(ch) // 遍历缓冲所有数据, 且多遍历1个 for i := 0; i < cap(ch)+1; i++ { // 从通道中取出数据 v, ok := <-ch // 打印取出数据的状态 fmt.Println(v, ok) } }
代码说明如下:
代码运行结果如下: 0 true 1 true 0 false
运行结果前两行正确输出带缓冲通道的数据,表明缓冲通道在关闭后依然可以访问内部的数据。 运行结果第三行的“0false”表示通道在关闭状态下取出的值。0表示这个通道的默认值,false表示没有获取成功,因为此时通道已经空了。我们发现,在通道关闭后,即便通道没有数据,在获取时也不会发生阻塞,但此时取出数据会失败。 |
请发表评论