在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
channel简单示例package main import ( "fmt" "time" ) //channel的创建,发送,接收 func channe1(){ //创建,channel是有类型的 c1 := make(chan int) //接收,在这段程序中接收方必须是一个goroutine,因为只在主程序中发送而不接收,程序会报deadlock //通常使用匿名函数开一个与主程序同时执行的内部方法,即并行执行 go func(){ fmt.Println("接收数据准备") //这里接收channel使用了io输出功能,io是可以被抢占控制权的,即IO的特性 fmt.Println(<- c1) fmt.Println("接收数据完成") //关闭,不显式关闭时,channel会随主程序(即main)的运行结束而结束 //如果“接收”处理数据的时间较长,就会出现主程序已经结束,但接收方还没处理完的情况 //此时可以让主程序sleep一段时间,等待接收方把数据处理完毕再关闭 close(c1) fmt.Println("接收结束") }() //发送数据,“接收”程序要在发送之前准备, //意思就是发送数据之前,要先为channel准备好接收; //否则,执行<- 1将1发送到channel时,go发现没有人接收,会报deadlock c1 <- 1 //接收方与主程序同时执行 //主程序在此停止1毫秒,就相当于主程序等了接收方一毫秒 time.Sleep(time.Millisecond) } func main(){ channe1() fmt.Println("主程序结束") } 输出 # go run chan1.go 接收数据准备 1 接收数据完成 接收结束 主程序结束
channel同步 如果一个动作会触发另外一个动作,那么这个行为通常被称为事件(event);如果这个事件不附带信息,那么此类事件又通常被用于同步。 channel有发送、接收、关闭三个操作; 发送触发接收,如果一个channel不发送,那么接收将处于阻塞。这种同步,可用于消息通知。
package main import ( "fmt" "time" ) func test(){ c := make(chan struct{}) go func(){ fmt.Println("我要花两分钟去看看园子里的花还活着吗") time.Sleep(7*time.Second) c <- struct{}{} }() //程序会在这里等待7秒 <- c //然后打印出下面这句话 fmt.Println("这花从屋里移值出去后,活得比以前更好了") } func main(){ test() }
channel数组package main import ( "fmt" "sync" ) func channel2(){ //WaitGroup的wait(在主程序中调用)与done(在与主程序并行执行的“接收”方中调用)的交互, //可以达到等待所有channel运行完毕,再让主程序运行的效果 //而不是程序员猜想channel“接收”需要多少时间运行, //然后去主程序中设置time.Sleep让主程序等待 var wtg sync.WaitGroup //channel数组 var workers [8]worker for i := 0; i < 8; i++{ //使用引用的方式传送参数,所有的channel公用一个WaitGroup workers[i] = createWorker(i,&wtg) } //要一次性添加完要等待执行的channel个数 wtg.Add(16) for i,worker := range workers { worker.in <- 'a' + i //wtg.Add(1) //这种方式会报错 } for i,worker := range workers { worker.in <- 'A' + i //wtg.Add(1) } //等待所有channel执行完毕,否则一直阻塞 wtg.Wait() fmt.Println("所有channel执行完毕") } func createWorker(id int, wtg *sync.WaitGroup) worker{ //channel作为struct的一个属性 wk := worker{ //chan<-表示channel只用于发送数据,即输入 in : make(chan int), done: func(){ wtg.Done() }, } //channel创建之后,就开始以并行的方式建立“接收”方 go wk.doWork(id) return wk } type worker struct { in chan int done func() } //“接收”方程序 func (wk *worker) doWork(id int){ //接收的确是按数组顺序顺序打印出来的,但这只是程序第一次运行的情况 //接收是在发送之前就以并行的方式运行起来了,之后数据中每个channel都一直处于阻塞等待状态 //也就是说数组中的每个channel谁先打印出数据,就表示该谁先发送数据(忽略channel传送数据时长差异) for n := range wk.in { fmt.Printf("第 %d 次接收的信息为 %c\n",id,n) //通知主程序工作处理完毕 wk.done() } } func main(){ channel2() fmt.Println("主程序结束") } 输出 # go run chann.go 第 0 次接收的信息为 a 第 1 次接收的信息为 b 第 2 次接收的信息为 c 第 3 次接收的信息为 d 第 4 次接收的信息为 e 第 5 次接收的信息为 f 第 6 次接收的信息为 g 第 6 次接收的信息为 G 第 7 次接收的信息为 h 第 7 次接收的信息为 H 第 0 次接收的信息为 A 第 1 次接收的信息为 B 第 2 次接收的信息为 C 第 3 次接收的信息为 D 第 4 次接收的信息为 E 第 5 次接收的信息为 F 所有channel执行完毕 主程序结束 这个例子除了介绍buffer channel,即channel数组的使用外,还涉及一个GO语言的重要思想,引用GO创始人的原话: Don't communicate by sharing memory;share memory by communicating. 不要通过共享内存来通信;通过通信来共享内存。 通过共享内存来通信:程序A运行完的结果返回给标识flag,如果flag为true运行程序B,返之运行程序C; 通过通信来共享内存:当一个channel处理完毕时,不是修改一个变量告诉发送方我处理完了,而通过channel来传达这个信息; 可以再定义一个channel来实现这个功能,此处使用了WaitGroup。
select调度最简单的select调度package main import ( "fmt" ) func main(){ var c1,c2 chan int //nil //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: fmt.Println("来自c2:",n) //如果所有channel都无数据 default: fmt.Println("无数据可接收") } } 输出 # go run chan3.go
无数据可接收
select调度是多选一,是阻塞的
select是阻塞的
package main import ( "fmt" "tools" ) func main() { var c1,c2 chan int i:= 0 for { i++ fmt.Println(i) tools.SleepBySec(1) select { case <- c1: fmt.Println(1) case <- c2: fmt.Println(2) } } fmt.Println("over") }
以上这段代码会输出一次1,然后就被阻塞住,select一直监听着两个case的channel是否有数据过来,阻塞着程序,阻塞着for循环。
package main import ( "fmt" ) func channel3() chan int{ out := make(chan int) go func(){ i := 0 for { out <- i i++ } }() return out } func main(){ var c1,c2 = channel3(),channel3() for{ //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: fmt.Println("来自c2:",n) /* 这里注释掉了,为什么呢? 这是因为如果不注释掉,大部分程序就会走default 因为在一个时间段内,发送数据只是占用了部分时间片,而不是所有时间 该行注释掉之后,select只有两个选择,输出c1与c2发送过来的数据 //如果所有channel都无数据 default: fmt.Println("无数据可接收") */ } } } 输出,只取一部分输出数据 来自c2: 17529 来自c2: 17530 来自c2: 17531 来自c2: 17532 来自c2: 17533 来自c2: 17534 来自c2: 17535 来自c2: 17536 来自c2: 17537 来自c1: 9834 来自c1: 9835 来自c1: 9836 来自c1: 9837 来自c1: 9838 来自c1: 9839 来自c1: 9840 来自c1: 9841 来自c1: 9842 来自c1: 9843
如果select中的default不注释 package main import ( "fmt" ) func channel3() chan int{ out := make(chan int) go func(){ i := 0 for { out <- i i++ } }() return out } func main(){ var c1,c2 = channel3(),channel3() i :=0 j :=0 for{ i++ fmt.Println("循环:",i) //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: fmt.Println("来自c2:",n) default: j++ fmt.Println("无数据可接收",j) } } } 输出,12755次循环中有12635次走了无数据 循环: 12753 无数据可接收 12633 循环: 12754 无数据可接收 12634 循环: 12755 无数据可接收 12635 无default package main import ( "fmt" ) func channel3() chan int{ out := make(chan int) go func(){ i := 0 for { out <- i i++ } }() return out } func main(){ var c1,c2 = channel3(),channel3() i :=0 j :=0 t :=0 for{ i++ fmt.Println("循环:",i) t = i - j fmt.Println("来自c1:",t) //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: j++ fmt.Println("来自c2:",n) } } } 输出 循环: 10218 来自c1: 4132 来自c2: 6086 循环: 10219 来自c1: 4132 来自c2: 6087 循环: 10220 来自c1: 4132 来自c2: 6088 循环: 10221 来自c1: 4132 来自c2: 6089
package main import ( "fmt" "time" ) func ch1() chan int{ fmt.Println("接收程序开始") c1 := make(chan int) go func(){ i:=0 for { c1 <- i } }() time.Sleep(time.Millisecond) return c1 } func main(){ a,b,i := 0,0,0 c1,c2 := ch1(),ch1() for{ i++ select{ case <- c1: a++ case <- c2: b++ } d := a+b fmt.Println("all:",i,d,a,b) } fmt.Println("主程序结束") } all: 74333 74333 36912 37421 all: 74334 74334 36913 37421 all: 74335 74335 36913 37422 all: 74336 74336 36914 37422 all: 74337 74337 36914 37423 all: 74338 74338 36914 37424 all: 74339 74339 36915 37424 all: 74340 74340 36915 37425 all: 74341 74341 36916 37425 all: 74342 74342 36917 37425 all: 74343 74343 36917 37426 all: 74344 74344 36918 37426 all: 74345 74345 36918 37427 all: 74346 74346 36918 37428 有default时,select几乎99%都选择了default,这说明每一次循环(每一个时间片)中,有数据的情况只是占这个时间片的一小部分,是极小的一部分,无数据的情况占绝大部分(12635/12755); 无default时,select在多个channel中的选择机会基本均等(36918/37428),并且,总的循环次数=所有case条件执行次数之和, 这意味着没有任何一次for循环是轮空的,尽管case有数据的情况只是占用一次循环时间中极小的一部分; 意味着每一次的for循环,即每一次的select执行,必会等待一个case满足条件,本次select才会结束; 意味着select是阻塞的,即至少有一个case满足条件时,select才会结束,否则就等待 再次验证一下 package main import ( "fmt" "time" ) func ch1() chan int{ fmt.Println("接收程序开始") c1 := make(chan int) go func(){ i:=0 for { c1 <- i time.Sleep(time.Millisecond*3000) } }() return c1 } func main(){ a,b,i := 0,0,0 c1,c2 := ch1(),ch1() for{ i++ select{ case <- c1: a++ case <- c2: b++ } d := a+b fmt.Println("all:",i,d,a,b) } fmt.Println("主程序结束") } 每发送一次数据行,等待3秒;select每次等待3秒输出 接收程序开始 接收程序开始 all: 1 1 0 1 all: 2 2 1 1 all: 3 3 2 1 all: 4 4 2 2 all: 5 5 3 2 all: 6 6 3 3 all: 7 7 4 3 all: 8 8 4 4
如果两个channel同时到达select,那么select二选一,即随机选择一个优先输出 如果不同时到达,则谁先达就选择谁,代码如下 package main import ( "fmt" "math/rand" "time" ) func channel3() chan int{ out := make(chan int) go func(){ i := 0 for { time.Sleep(time.Duration(rand.Intn(1000))* time.Millisecond) out <- i i++ } }() return out } func main(){ var c1,c2 = channel3(),channel3() for{ //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: fmt.Println("来自c2:",n) } } } 输出 来自c2: 0 来自c1: 0 来自c2: 1 来自c1: 1 来自c2: 2 来自c1: 2 来自c2: 3 来自c1: 3 来自c2: 4 来自c1: 4 来自c2: 5 来自c1: 5 来自c1: 6 来自c2: 6 来自c2: 7 来自c2: 8 来自c1: 7 来自c1: 8 前面有说,一个channel必须先定义接收处理的程序,才能开始发送数据,这而代码却是先发送数据,之后才是select接收,这不是与之前所说的相矛盾吗? 那来验证一下,使用最简单的方式先发送再让select接收试一试 package main import ( "fmt" //"math/rand" //"time" ) /* func channel3() chan int{ out := make(chan int) go func(){ i := 0 for { time.Sleep(time.Duration(rand.Intn(1000))* time.Millisecond) out <- i i++ } }() return out } */ func main(){ //var c1,c2 = channel3(),channel3() var c1,c2 = make(chan int),make(chan int) c1 <- 1 c2 <- 2 for{ //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: fmt.Println("来自c2:",n) } } } 输出错误deadlock fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]: main.main() /usr/local/automng/src/goapp/src/test/channel3/chan3.go:27 +0x92 exit status 2 那为什么开始的代码没有报错? channel是goroutine与goroutine之间的通信这是因为发送方位于一个goroutine中(go func(){...}())中,goroutine处理了这个问题, 前面有程序中只有一个go,也就是只有一个goroutine,为什么也能运行? 这是因为go程序中,main方法本身也是一个goroutine 下面的代码就是正确的 package main import ( "fmt" //"math/rand" //"time" ) /* func channel3() chan int{ out := make(chan int) go func(){ i := 0 for { time.Sleep(time.Duration(rand.Intn(1000))* time.Millisecond) out <- i i++ } }() return out } */ func main(){ //var c1,c2 = channel3(),channel3() var c1,c2 = make(chan int),make(chan int) go func(){ for { c1 <- 1 c2 <- 2 //time.Sleep(time.Millisecond) } }() for{ //select可以在channel为nil的时候就去接收数据, //哪个channel有数据发送过来就进行接收 select { case n:= <- c1: fmt.Println("来自c1:",n) case n:= <- c2: fmt.Println("来自c2:",n) } } } 输出,不断地发送1和2,然后又不断地输出1和2 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 来自c2: 2 来自c1: 1 这里的输出与之前的输出又不一样,之前的输出一片c1或c2,而这里是c1与c2交替式输出,一个c1下来接着就是c2,这样的区别很明显;下面的这段话是个人推测,不保证百分之百是这样哈 首先,在计算机中我们看到一个功能正在运行,比如复制一个电影,但实际上“一个”CPU还在同时做很多工作,比如你还打开了一个网页在看视频,只是这个CPU处理任务的速度非常快,至少在我们看来,这两个工作都在进行着;但对计算机来说,这些工作是在不同的时间片中完成的,同时IO流也在不同的设备上快速切换着。 GO程序是非抢占式的,就是GO程序拿到CPU资源之后,自己不释放别人不可以抢走;GO中也有可以抢占式的程序,即别人可以抢走程序执行的控制权,比如IO操作(fmt打印就是输出IO到屏幕),这种操作会中断GO对资源的占用,我们可以认为这是系统需要这样,而GO是运行在系统上才不得不这样;比如一个GO主程序正在执行代码,到fmt的时候,在一些时间片上系统抢了GO程序的资源干了别的事,然后又把资源还给了GO程序。 channel操作也是非抢占式的,先给哪个channel发送数据,那么这个channel如果不被中断,那么它就会先输出,先输出就会被select调度先输出;for中先c1,然后 fmt中断一次,再c2再fmt中断一次,然后下一轮for循环,所以保持了c1,c2,c1,c2……的输出顺序 而成片输出c1或c2的代码中,channel的创建是在方法中,数据发送则是在两个不同的goroutine中,这两个goroutine是并行执行,而不像for里面c1与c2, for { c1 <- 1 c2 <- 2 //time.Sleep(time.Millisecond) } 程序必定是先c1再c2这么有顺序,下面将变量i变成全局变量,再次执 |
请发表评论