作者:陈皓
如果你还不了解Go语言的语法,还请你移步先看一下上篇——《Go语言简介(上):语法》
goroutine
GoRoutine主要是使用go关键字来调用函数,你还可以使用匿名函数,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package main
import "fmt"
func f(msg string) {
fmt.Println(msg)
}
func main(){
go f( "goroutine" )
go func(msg string) {
fmt.Println(msg)
}( "going" )
}
|
我们再来看一个示例,下面的代码中包括很多内容,包括时间处理,随机数处理,还有goroutine的代码。如果你熟悉C语言,你应该会很容易理解下面的代码。
你可以简单的把go关键字调用的函数想像成pthread_create。下面的代码使用for循环创建了3个线程,每个线程使用一个随机的Sleep时间,然后在routine()函数中会输出一些线程执行的时间信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package main
import "fmt"
import "time"
import "math/rand"
func routine(name string, delay time.Duration) {
t0 := time.Now()
fmt.Println(name, " start at " , t0)
time.Sleep(delay)
t1 := time.Now()
fmt.Println(name, " end at " , t1)
fmt.Println(name, " lasted " , t1.Sub(t0))
}
func main() {
rand.Seed(time.Now().Unix())
var name string
for i:= 0 ; i< 3 ; i++{
name = fmt.Sprintf( "go_%02d" , i)
go routine(name, time.Duration(rand.Intn( 5 )) * time.Second)
}
var input string
fmt.Scanln(&input)
fmt.Println( "done" )
}
|
运行的结果可能是:
1
2
3
4
5
6
7
8
9
|
go_00 start at 2012-11-04 19:46:35.8974894 +0800 +0800
go_01 start at 2012-11-04 19:46:35.8974894 +0800 +0800
go_02 start at 2012-11-04 19:46:35.8974894 +0800 +0800
go_01 end at 2012-11-04 19:46:36.8975894 +0800 +0800
go_01 lasted 1.0001s
go_02 end at 2012-11-04 19:46:38.8987895 +0800 +0800
go_02 lasted 3.0013001s
go_00 end at 2012-11-04 19:46:39.8978894 +0800 +0800
go_00 lasted 4.0004s
|
goroutine的并发安全性
关于goroutine,我试了一下,无论是Windows还是Linux,基本上来说是用操作系统的线程来实现的。不过,goroutine有个特性,也就是说,如果一个goroutine没有被阻塞,那么别的goroutine就不会得到执行。这并不是真正的并发,如果你要真正的并发,你需要在你的main函数的第一行加上下面的这段代码:
1
2
3
|
import "runtime"
...
runtime.GOMAXPROCS(4)
|
还是让我们来看一个有并发安全性问题的示例(注意:我使用了C的方式来写这段Go的程序)
这是一个经常出现在教科书里卖票的例子,我启了5个goroutine来卖票,卖票的函数sell_tickets很简单,就是随机的sleep一下,然后对全局变量total_tickets作减一操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package main
import "fmt"
import "time"
import "math/rand"
import "runtime"
var total_tickets int32 = 10 ;
func sell_tickets(i int ){
for {
if total_tickets > 0 {
time.Sleep( time.Duration(rand.Intn( 5 )) * time.Millisecond)
total_tickets--
fmt.Println( "id:" , i, " ticket:" , total_tickets)
} else {
break
}
}
}
func main() {
runtime.GOMAXPROCS( 4 )
rand.Seed(time.Now().Unix())
for i := 0 ; i < 5 ; i++ {
go sell_tickets(i)
}
var input string
fmt.Scanln(&input)
fmt.Println(total_tickets, "done" )
}
|
这个程序毋庸置疑有并发安全性问题,所以执行起来你会看到下面的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
$go run sell_tickets.go
id : 0 ticket: 9
id : 0 ticket: 8
id : 4 ticket: 7
id : 1 ticket: 6
id : 3 ticket: 5
id : 0 ticket: 4
id : 3 ticket: 3
id : 2 ticket: 2
id : 0 ticket: 1
id : 3 ticket: 0
id : 1 ticket: -1
id : 4 ticket: -2
id : 2 ticket: -3
id : 0 ticket: -4
-4 done
|
可见,我们需要使用上锁,我们可以使用互斥量来解决这个问题。下面的代码,我只列出了修改过的内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package main
import "fmt"
import "time"
import "math/rand"
import "sync"
import "runtime"
var total_tickets int32 = 10 ;
var mutex = &sync.Mutex{}
func sell_tickets(i int ){
for total_tickets> 0 {
mutex.Lock()
if total_tickets > 0 {
time.Sleep( time.Duration(rand.Intn( 5 )) * time.Millisecond)
total_tickets--
fmt.Println(i, total_tickets)
}
mutex.Unlock()
}
}
.......
......
|
原子操作
说到并发就需要说说原子操作,相信大家还记得我写的那篇《无锁队列的实现》一文,里面说到了一些CAS – CompareAndSwap的操作。Go语言也支持。你可以看一下相当的文档
我在这里就举一个很简单的示例:下面的程序有10个goroutine,每个会对cnt变量累加20次,所以,最后的cnt应该是200。如果没有atomic的原子操作,那么cnt将有可能得到一个小于200的数。
下面使用了atomic操作,所以是安全的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package main
import "fmt"
import "time"
import "sync/atomic"
func main() {
var cnt uint32 = 0
for i := 0 ; i < 10 ; i++ {
go func() {
for i:= 0 ; i< 20 ; i++ {
time.Sleep(time.Millisecond)
atomic.AddUint32(&cnt, 1 )
}
}()
}
time.Sleep(time.Second)
cntFinal := atomic.LoadUint32(&cnt)
fmt.Println( "cnt:" , cntFinal)
}
|
这样的函数还有很多,参看go的atomic包文档(被墙)
Channel 信道
Channal是什么?Channal就是用来通信的,就像Unix下的管道一样,在Go中是这样使用Channel的。
下面的程序演示了一个goroutine和主程序通信的例程。这个程序足够简单了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package main
import "fmt"
func main() {
channel := make(chan string)
go func() { channel <- "hello" }()
msg := <- channel
fmt.Println(msg)
}
|
指定channel的buffer
指定buffer的大小很简单,看下面的程序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package main
import "fmt"
func main() {
channel := make(chan string, 2 )
go func() {
channel <- "hello"
channel <- "World"
}()
msg1 := <-channel
msg2 := <-channel
fmt.Println(msg1, msg2)
}
|
Channel的阻塞
注意,channel默认上是阻塞的,也就是说,如果Channel满了,就阻塞写,如果Channel空了,就阻塞读。于是,我们就可以使用这种特性来同步我们的发送和接收端。
下面这个例程说明了这一点,代码有点乱,不过我觉得不难理解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package main
import "fmt"
import "time"
func main() {
channel := make(chan string)
go func() {
channel <- "hello"
fmt.Println( "write \"hello\" done!" )
channel <- "World"
fmt.Println( "write \"World\" done!" )
fmt.Println( "Write go sleep..." )
time.Sleep( 3 *time.Second)
channel <- "channel"
fmt.Println( "write \"channel\" done!" )
}()
time.Sleep( 2 *time.Second)
fmt.Println( "Reader Wake up..." )
msg := <-channel
fmt.Println( "Reader: " , msg)
msg = <-channel
fmt.Println( "Reader: " , msg)
msg = <-channel
fmt.Println( "Reader: " , msg)
}
|
上面的代码输出的结果如下:
1
2
3
4
5
6
7
8
|
Reader Wake up...
Reader: hello
write "hello" done !
write "World" done !
Write go sleep ...
Reader: World
write "channel" done !
Reader: channel
|
Channel阻塞的这个特性还有一个好处是,可以让我们的goroutine在运行的一开始就阻塞在从某个channel领任务,这样就可以作成一个类似于线程池一样的东西。关于这个程序我就不写了。我相信你可以自己实现的。
多个Channel的select
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package main
import "time"
import "fmt"
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(time.Second * 1 )
c1 <- "Hello"
}()
go func() {
time.Sleep(time.Second * 1 )
c2 <- "World"
}()
for i := 0 ; i < 2 ; i++ {
select {
case msg1 := <-c1:
fmt.Println( "received" , msg1)
case msg2 := <-c2:
fmt.Println( "received" , msg2)
}
}
}
|
注意:上面的select是阻塞的,所以,才搞出ugly的for i <2这种东西。
Channel select阻塞的Timeout
解决上述那个for循环的问题,一般有两种方法:一种是阻塞但有timeout,一种是无阻塞。我们来看看如果给select设置上timeout的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
timeout_cnt := 0
for {
select {
case msg1 := <-c1:
fmt.Println( "msg1 received" , msg1)
case msg2 := <-c2:
fmt.Println( "msg2 received" , msg2)
case <- time .After( time .Second * 30):
fmt.Println( "Time Out" )
timout_cnt++
}
if time_cnt > 3 {
break
}
}
|
上面代码中高亮的代码主要是用来让select返回的,注意 case中的time.After事件。
Channel的无阻塞
好,我们再来看看无阻塞的channel,其实也很简单,就是在select中加入default,如下所示:
1
2
3
4
5
6
7
8
9
10
11
|
for {
select {
case msg1 := <-c1:
fmt.Println( "received" , msg1)
case msg2 := <-c2:
fmt.Println( "received" , msg2)
default :
fmt.Println( "nothing received!" )
time .Sleep( time .Second)
}
}
|
Channel的关闭
关闭Channel可以通知对方内容发送完了,不用再等了。参看下面的例程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package main
import "fmt"
import "time"
import "math/rand"
func main() {
channel := make(chan string)
rand.Seed(time.Now().Unix())
go func () {
cnt := rand.Intn( 10 )
fmt.Println( "message cnt :" , cnt)
for i:= 0 ; i<cnt; i++{
channel <- fmt.Sprintf( "message-%2d" , i)
}
close(channel)
|
|
请发表评论