在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
协程是非常轻量级的,kb级,而线程是重量级的,默认是1M。 java中线程与系统线程是1:1的关系,而go中协程与系统线程是m:n的关系。KSE,kernel Space Entity。 go新建一个协程的方式就是用go关键字,示例: func main() {
start := time.Now()
for i := 0; i < 3; i++ {
go func() {
time.Sleep(time.Second * 5)
}()
}
time.Sleep(time.Second * 10)
fmt.Println(time.Since(start))
}
总耗时10s多一点点,而不是20s,说明那5s确实是在子协程中sleep的。 在sync包中有很多实用的工具,如 sync.Mutex,它相当于java中的ReentrantLock,有lock()和unlock()方法。用法示例: func main() {
counter := 0
start := time.Now()
mutex := new(sync.Mutex)
for i := 0; i < 50000; i++ {
go func() {
defer func() {
mutex.Unlock()
}()
mutex.Lock()
counter++
}()
}
time.Sleep(time.Second * 1)
fmt.Println(counter, ",", time.Since(start))
}
在defer中调用unlock方法。 sync.RWMutex,它相当于java中的ReentrantReadWriteLock,内部有一个读锁和写锁。用法同sync.Mutex。 sync.WaitGroup,它相当于java中的CountDownLatch,主协程等待一堆子协程执行完之后才继续执行。用法示例: func main() {
counter := 0
mutex := new(sync.Mutex)
start := time.Now()
var waitGroup = new(sync.WaitGroup)
for i := 0; i < 50000; i++ {
go func() {
waitGroup.Add(1)
defer func() {
mutex.Unlock()
}()
mutex.Lock()
counter++
waitGroup.Done()
}()
}
waitGroup.Wait()
fmt.Println(counter, ",", time.Since(start))
}
这50000个子任务实际只花了不到20ms,不用苦等1s那么久。 sync.Mutex、sync.RWMutex、sync.WaitGroup都是基于共享内存的并发机制,这种机制是在编程语言中比较通用的机制,java也有。go不推荐使用共享内存机制,而是推荐使用CSP并发模型机制。 CSP全称是Communicating Sequential Processes,可以翻译成通信顺序进程(Communicating翻译成通信的意思),在1977年被一位英国计算机科学家提出(这个科学家还发明了快排)。简单解释就是,CSP模型由并发执行的实体组成,实体之间通过发送消息进行通信,发送消息时使用的是通道。CSP模型的关键是通道,而不是发送消息的实体。口诀是Do not communicate by sharing memory,instead,share memory by communicating。不要以共享内存的方式来通信,相反,要通过通信来共享内存。 go的CSP并发模型,是通过goroutine和通道channel实现的。goroutine是并发执行的实体,底层使用协程(coroutine)实现并发。coroutine运行在用户态,从而避免了内核态和用户态的切换导致的成本。 channel可分为两种: 一种是普通channel,生产者往通道中放数据时必须有消费者从通道中取,不然生产者会阻塞;同样的,消费者从通道中取数据时,如果没有生产者往通道中放数据,消费者也会阻塞。 另一种是buffered channel,创建时要指定通道的大小,在未达到指定大小时,生产者可以任意往通道中放数据,不会阻塞,直到达到指定大小后,阻塞。 建channel也是用make关键字: 建普通channel,语法是make(chan type)或者make(chan type, 0),如ch := make(chan string),通道想放什么类型的数据都行,如int、string,甚至可以是interface{}。 建buffered channel,语法是make(chan type, value),value是个大于0的整数,如ch := make(chan string, 10),即只需要在make函数中添加第二个参数,指定channel的大小。 往通道中放数据、从通道中取数据,都要用到一个特殊的操作符<-,小于号后面跟一个中横线,好像一个左箭头,箭头的指向就是数据的流向,往通道中放数据,箭头要指向通道,通道<-,从通道中取数据,箭头要背向通道,<-通道。 func service() string {
time.Sleep(time.Second * 1)
return "Done"
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Second * 2)
fmt.Println("Task is done")
}
func main() {
start := time.Now()
fmt.Println(service())
otherTask()
fmt.Println("cost", time.Since(start))
}
上例中,service函数需要执行1s,otherTask函数需要执行2s,上面这个程序,会先执行service函数,再执行otherTask函数,总耗时在3s。 现在我们要求总耗时2s,且能够在主协程中获取service函数的返回值。观察otherTask函数和service函数,otherTask函数和service函数的返回值没有关系,所以可以并行执行service函数和otherTask。把service函数另起一个协程执行,那么如何在主协程中获取子协程中的值呢?建个channel就好了,子协程往channel中放,主协程从channel中取。 改造如下: func service() string {
time.Sleep(time.Second * 1)
return "Done"
}
func asyncService(ch chan string) {
go func() {
result := service()
ch <- result
}()
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Second * 2)
fmt.Println("Task is done")
}
func main() {
start := time.Now()
ch := make(chan string)
asyncService(ch)
otherTask()
result := <-ch
fmt.Println("result= " + result)
fmt.Println("cost", time.Since(start))
}
如上,在主协程中创建了一个channel,在子协程中执行service函数,并将返回值放入主协程创建的channel,在主协程中就可以从channel中取数据了。用的channel是一个普通channel,子协程执行完service函数后,把service函数返回值放到channel时会阻塞,因为otherTask函数还没执行完,主协程还不会从channel中取数据,直到otherTask函数执行完,主协程从channel中取数据,子协程才能把service函数返回值放到channel,主协程取出并使用。 我们还可以优化一下,把子协程解放出来,没必要阻塞一段时间,占用资源。用buffered channel替换普通channel,优化后代码如下: func service() string {
time.Sleep(time.Second * 1)
return "Done"
}
func asyncService(ch chan string) {
go func() {
result := service()
fmt.Println("service执行完毕")
ch <- result
fmt.Println("channel放入完毕")
}()
}
func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Second * 2)
fmt.Println("Task is done")
}
func main() {
start := time.Now()
ch := make(chan string, 1)
asyncService(ch)
otherTask()
result := <-ch
fmt.Println("result= " + result)
fmt.Println("cost", time.Since(start))
}
只改动了一点,即把ch := make(chan string)改为了ch := make(chan string, 1)
多路选择和超时控制 select case块,语法糖是: select { case ret := <-ch1: fmt.Println("A", ret) case ret := <-ch2: fmt.Println("B", ret) } 示例如下: func main() { ch1 := make(chan string) go handleChanel(ch1) ch2 := make(chan string) go handleChanel(ch2) time.Sleep(time.Second * 1) select { case ret := <-ch1: fmt.Println("A", ret) case ret := <-ch2: fmt.Println("B", ret) default: fmt.Println("DEFAULT") } } func handleChanel(ch chan string) { rd := rand.Intn(10) time.Sleep(time.Second * time.Duration(rd)) ch <- strconv.Itoa(rd) } 如果没有default,那么当执行到select case块时 ,假如所有的case都阻塞,那么当前协程就会阻塞,直到某一个case不阻塞;假如部分case阻塞,那么当前协程会找一个不阻塞的case执行(随机,不一定是先声明的case)。 如果有default,那么当执行到select case块时 ,假如所有的case都阻塞,那么当前协程会执行default;假如部分case阻塞,那么当前协程会找一个不阻塞的case执行(同样是随机的)。 利用多路选择机制,我们可以用来设计超时控制: 语法糖是: select { case ret := <-ch1: fmt.Println("A", ret) case <-time.After(time.Second * 5): fmt.Println("超时") } 示例如下: func main() { ch1 := make(chan string) go handleChanel(ch1) ch2 := make(chan string) go handleChanel(ch2) select { case ret := <-ch1: fmt.Println("A", ret) case <-time.After(time.Second * 5): fmt.Println("超时") } } func handleChanel(ch chan string) { rd := rand.Intn(10) time.Sleep(time.Second * time.Duration(rd)) ch <- strconv.Itoa(rd) } channel关闭 close(ch) 向关闭的channel发送数据,会导致协程panic。 v, ok := <-ch ok为true时,表示channel未关闭,可正常从通道中取数据;ok为false时,表示通道已关闭,取出来的数据为相应数据类型的零值,如0,空字符串,nil等等。 channel关闭的广播机制 所有的channel接收者在channel关闭时都会立刻从阻塞等待中返回,且上述ok值为false。由此,可用于同时向多个订阅者发送信号,如退出信号。 可以用range遍历channel,在遍历前要确保信道处于关闭状态,否则循环会阻塞。 func main() { c := make(chan int, 3) c <- 1 c <- 2 c <- 3 close(c) for k := range c { fmt.Println(k) } }
用sync.Once可以保证某段代码只执行一次,示例如下: func main() { var once sync.Once once.Do(func() { fmt.Println(1) }) once.Do(func() { fmt.Println(2) }) } 我们利用sync.Once可以实现单例模式,也可以用来在高并发下刷新缓存,等等。 所有任务完成之后,才往下执行,这种需求,除了可以用sync.WaitGroup外,还可以用buffered channel,示例如下: func main() { fmt.Println("A") num := 10 ch := make(chan string, num) for i := 0; i < num; i++ { go func(i int) { defer func() { ch <- "" if r := recover(); r != nil { } }() fmt.Println(i) }(i) } for i := 0; i < num; i++ { <-ch } fmt.Println("B") } 推荐使用buffered channel替代sync.WaitGroup。 利用buffered channel还可以实现对象池,如连接池。可以看下gorm的源码,看看gorm的连接池是怎么设计。 sync.Pool的目的是保存和复用临时对象,减少内存分配,减轻GC压力。 用Pool对象的Get()方法取,Put(x interface{})放。 用法示例: func main() { p := &sync.Pool{ New: func() interface{} { fmt.Println("耗时严重") return 0 }, } a := p.Get().(int) p.Put(1) b := p.Get().(int) fmt.Println(a, b) a = p.Get().(int) p.Put(1) runtime.GC() // 手动调用GC b = p.Get().(int) fmt.Println(a, b) } sync.Pool中的元素在GC时会被清空,我们可以调用runtime.GC()手动GC来验证。正常来说,一个服务在运行过程中,GC时间是不确定的,所以sync.Pool中是否有数据也不一定。如果sync.Pool中没有数据,那么Get时会调用创建Pool对象时NEW指定的函数生成数据并返回,第一次Get也是这样。从sync.Pool中get,用完后,别忘了put,否则会一直调用NEW对应的函数生成新数据,这样就达不到复用临时对象,减少内存分配的目的了。 sync.Pool不是只可以放一个数据,而是可以放任意多个数据,get取出的数据是乱序的,所以不能当做一个队列或者栈使用。 sync.Pool在fmt包中的各种打印方法中有使用,如fmt.Println()源码中p := newPrinter(),newPrinter()会从sync.Pool中取出一个可复用的对象,p.free(),就是把复用对象再放到sync.Pool中。 |
请发表评论