Go并发编程
目录
本文是作者学习Go并发编程的笔记总结,主要内容是Go并发编程的示例代码,下面是与本文相关的链接:
本文中包含的代码段的完整代码可以去作者的Github下载
说明:
在Go并发编程的学习过程中,除了具体如何使用Go实现并发编程外,还包括进程、线程、协程、生产者消费者模型、互斥量、锁、条件变量等,下文并不会详细说明这些概念,如果有想要详细了解这些内容,可以去看Unix系统编程和Unix网络编程这两本书。
Go程
Go在语言级别支持协程,叫goroutine。Go语言标准库提供的所有系统调用操作(包括所有同步IO操作),都会出让CPU给其他goroutine。这让轻量级线程的切换管理不依赖于系统的线程和进程,也不需要依赖于CPU的核心数量。
Go程的创建与使用
创建时只需要使用关键字 go
func sing() {
for i := 0; i < 5; i++ {
fmt.Println("Sing something...")
time.Sleep(100 * time.Millisecond)
}
}
func dance() {
for i := 0; i < 5; i++ {
fmt.Println("Someone dancing...")
time.Sleep(100 * time.Millisecond)
}
}
func main() {
fmt.Println("顺序执行")
sing()
dance()
// 并发执行
fmt.Println("并发执行")
go sing()
go dance()
for {
;
}
}
Go程使用的相关函数说明:
Gosched()、GOMAXPROCS()
// runtime.Gosched() 出让当前go程所占用的cpu时间片
// runtime.Goexit() 结束调用该函数的当前go程
func main() {
go func() {
for i := 0; i < 10; i++ {
fmt.Println("this is a goroutine test")
//time.Sleep(100 * time.Microsecond)
}
}()
for {
runtime.Gosched()
fmt.Println("this is a main test")
//time.Sleep(100 * time.Microsecond)
}
}
func main() {
// 创建一个子go程
go func () {
for i := 0; i < 5; i++ {
fmt.Println("-----I am Goroutine-----")
time.Sleep(time.Second)
}
}()
// 主Goroutine
for i := 0; i < 5; i++ {
fmt.Println("-----I am main-----")
time.Sleep(time.Second)
if i == 2 {
break
}
}
// 主Goroutine退出 子go程也会退出
/*
-----I am main-----
-----I am Goroutine-----
-----I am Goroutine-----
-----I am main-----
-----I am main-----
-----I am Goroutine-----
-----I am Goroutine-----
*/
// runtime.Gosched() 出让当前go程所占用的cpu时间片
}
// runtime.GOMAXPROCS(n) 设置当前进程使用的最大cpu内核数 返回上一次调用成功的设置值 首次调用返回默认值
func main() {
n := runtime.GOMAXPROCS(2)// 将CPU设为双核
fmt.Println(n)
for {
go fmt.Print(0)// 子go程
fmt.Print(1)// 主go程
}
}
Channel
channel是一种数据类型(管道),主要用于解决go程同步问题以及协程之间数据共享的问题。
特点:一端写一端读
Channel的定义与使用
/*
make(chan 在channel中传递的数据类型, 容量)
容量为0表示无缓冲
容量大于0表示有缓冲
*/
// 全局定义channel 完成数据同步
var channel = make(chan int)
func printer(s string) {
for _, ch := range s {
fmt.Printf("%c", ch)
time.Sleep(300 * time.Millisecond)
}
}
// 先执行
func person1() {
printer("Hello")
channel <- 1// 向channel写数据 如果写的数据没有被读走 channel阻塞
}
// 后执行
func person2() {
<- channel// 从channel读
printer("World")
}
func main() {
go person1()
go person2()
// 输出WHeorllldo person1 person2 交替使用标准输出 导致输出结果乱序
for {
;
}
}
Channel同步传递数据
func main() {
ch := make(chan string)
// len() 得到channel中剩余未读取数据个数
// cap() 得到通道容量
//fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
go func () {
for i := 0; i < 5; i++ {
fmt.Println("i = ", i)
}
// 通知主go打印完毕
ch <- "Completed..."
fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
}()
//fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
str := <- ch
//fmt.Println("len(ch) =", len(ch), "cap(ch) =", cap(ch))
fmt.Println("主go", str)
}
无缓冲Channel和有缓冲Channel
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
fmt.Println("子go程写, i =", i)
ch <- i
}
}()
//time.Sleep(time.Second * 2)
for i := 0; i < 5; i++ {
num := <- ch
fmt.Println("主go程读,i = ", num)
}
}
func main() {
ch := make(chan int, 3)// 存满3个元素之前不会阻塞
fmt.Println("len =", len(ch), "cap =", cap(ch))
go func() {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("子go程:", i)
fmt.Println("len =", len(ch), "cap =", cap(ch))
}
}()
time.Sleep(time.Second * 3)
for i := 0; i < 5; i++ {
num := <- ch
fmt.Println("主go程:", num)
fmt.Println("len =", len(ch), "cap =", cap(ch))
}
}
Channel的关闭
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 8; i++ {
ch <- i
}
close(ch)// 写端写完数据 主动关闭channel
}()
for {
// 检测对端是否关闭
if num, ok := <- ch; ok == true {// ok == true, 读到数据
fmt.Println("Read num =", num)
} else {// channel已经关闭
break
}
}
// 或者换种写法
// for num := range ch {}
}
/*
数据未发送完不应该关闭channel
无缓冲channel 无法向已经关闭的channel中写数据 但是还可以读
*/
单向Channel的使用
有时在函数中,我们只需要从channel中读取数据或者写入数据,这时我们可以使用单向channel
/*
func main() {
// 双向channel 默认
ch := make(chan int)
var sendCh chan <- int// 单向写channel
// 可以将双向channel转换为单向channel 但是反之不行
sendCh = ch
sendCh <- 754
// 出错 单向写channel不能读
//num := <- sendCh
var recvCh <- chan int = ch// 单向读channel
num := <- recvCh
fmt.Println(num)
// 反向赋值 出错
//var ch2 chan int = sendCh
}
*/
func send(out chan <- int) {
out <- 88
close(out)
}
func recv(in <- chan int) {
n := <- in
fmt.Println("Recv num =", n)
}
func main() {
ch := make(chan int)// 双向channel
go func(){
send(ch)// 双向channel转为写channel
}()
recv(ch)
}
使用Channel实现生产者消费者模型
生产者:发送端
消费者:接收端
缓冲区作用:
解耦(降低生产者与消费者之间的耦合度)
并发(生产者与消费者数量不对等时 能保持正常通信)
缓存(生产者与消费者数据处理速度不一致时 暂存数据)
func producer(out chan <- int) {
for i := 0; i < 10; i++ {
fmt.Println("producer send", i * i)
out <- i * i
}
close(out)
}
func consumer(in <- chan int) {
for num := range in {
fmt.Println("consumer recv", num)
}
}
func main() {
// 无缓冲channel实现生产者消费者
//ch := make(chan int)
// 有缓冲
ch := make(chan int, 5)
go producer(ch)// 子go程作为生产者
consumer(ch)
}
定时器
创建定时器
func main() {
fmt.Println("Now time:", time.Now())
// 创建定时器
myTimer := time.NewTimer(time.Second * 2)
nowTime := <- myTimer.C// 当系统向定时器中的channel写完后 再从中读
fmt.Println("Now time:", nowTime)
}
/*
time.Timer
定时器,由channel实现,当设定的时间到达时,系统会向定时器中的channel写
type Timer struct {
C <- chan Time
r runtimeTimer
}
*/
定时的3种方式
// 定时器的停止和重置
func main() {
myTimer := time.NewTimer(time.Second * 3)
myTimer.Reset(1 * time.Second)// 重置定时器
go func() {
<- myTimer.C
fmt.Println("子go程读取定时完毕")
}()
//myTimer.Stop()// 设置定时器停止 子go程无法从定时器读到任何数据
for {
;
}
}
周期定时
func main() {
// 创建一个是否终止的channel
quit := make(chan bool)
fmt.Println("now:", time.Now())
// 周期定时 每隔一秒 系统会向Ticker.C写一次
myTicker := time.NewTicker(time.Second)
i := 0
go func() {
for {
nowTime := <- myTicker.C
i++
fmt.Println("nowTime:", nowTime)
// 定时器循环了五次后 向quit写数据
// 主go程从quit读到数据后 程序退出
if i == 5 {
quit <- true
}
}
}()
<- quit
}
select
使用select监听每个channel
select的使用
func main() {
ch := make(chan int)// 用于数据通信的channel
quit := make(chan bool)// 用于判断是否退出的channel
// 子go写数据
go func() {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
close(ch)// ch 虽然关闭 但是还可以读到0
quit <- true
}()
// 主go读数据
for {
// select下的case中 若果某个case可读 则执行
// 如果所有case都不可读 则阻塞在select
// case中有多个满足监听条件 任选一个执行
// 可以使用default来处理所有case都不满足监听条件的状况 通常不会这么使用 会产生忙等待
// select自身不带有循环机制 需要借助外层for循环来监听
// break只能跳出select
select {
case num := <- ch:
fmt.Println("Read:", num)
case <- quit:// quit 可读 退出for
fmt.Println("quit")
// break跳出的是select
//break
return
}
// select执行后执行
fmt.Println("-----------------")
}
}
select超时处理
func main() {
ch := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case num := <- ch:
fmt.Println("Read:", num)
case <- time.After(3 * time.Second):// 超过3秒还没读到数据
quit <- true
}
}
}()
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(time.Second)
}
<- quit
fmt.Println("quit")
}
同步相关
使用channel产生死锁
// 单go程自己死锁
// channel应该在至少两个以上go程中进行通信 否则死锁
func main1() {
ch := make(chan int)
// fatal error: all goroutines are asleep - deadlock
ch <- 748// 程序死锁 卡在这一步 等待ch被读取 而不会执行下面读取ch的那一步
num := <- ch
fmt.Println("Read:", num)
}
// go程间channel访问顺序导致死锁
// 使用channel时 读写两端要有同时有机会执行
func main2() {
ch := make(chan int)
num := <- ch// 死锁 等待读 导致子go程不会执行 即写操作不会执行
fmt.Println("Read:", num)
go func() {
ch <- 789
}()
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
//
go func() {
for {
select {
case num := <- ch1:
ch2 <- num
}
}
}()
for {
select {
case num := <- ch2:
ch1 <- num
}
}
}
互斥锁
// 借助channel完成数据同步
//var ch := make(chan int)
// 通过锁完成数据同步
var mutex sync.Mutex// 创建互斥锁 新建互斥锁状态为未加锁0
func printer(str string) {
mutex.Lock()// 访问共享数据之前加锁
for _, ch := range str {
fmt.Printf("%c", ch)
time.Sleep(time.Millisecond * 300)
}
mutex.Unlock()// 共享数据访问结束 解锁
}
func person1() {
printer("Hello")
//ch <- 111
}
func person2() {
//<- ch
printer("World")
}
func main() {
go person1()
go person2()
for {
;
}
}
读写锁
读写锁的使用
// 读写锁
//var rwMutex sync.RWMutex
// 在go中尽量不要将互斥锁 读写锁 与 channel混用 可能造成隐形死锁
// 下面程序会死锁
// 不使用channel 而是用全局变量
func readGo(in <- chan int, idx int) {
for {
rwMutex.RLock()// 读 加锁
num := <- in
fmt.Println("Id", idx, "Read", num)
rwMutex.RUnlock()// 读 解锁
}
}
func writeGo(out chan <- int, idx int) {
for {
// 生成随机数
num := rand.Intn(1000)
rwMutex.Lock()// 写 加锁
out <- num
fmt.Println("Id", idx, "Write", num)
//time.Sleep(time.Millisecond * 300)
rwMutex.Unlock()
time.Sleep(time.Millisecond * 300)
}
}
func main() {
// 随机数种子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
//quit := make(chan bool)
// 5个读go程 5个写go程
for i := 0; i < 5; i++ {
go readGo(ch, i)
}
for i := 0; i < 5; i++ {
go writeGo(ch, i)
}
//<- quit
for {
;
}
}
// 使用全局变量
var value int// 定义全局变量 模拟共享数据
func readGo(idx int) {
for {
rwMutex.RLock()// 读 加锁
num := value
fmt.Println("Id", idx, "Read", num)
time.Sleep(time.Millisecond * 300)
rwMutex.RUnlock()// 读 解锁
}
}
func writeGo(idx int) {
for {
// 生成随机数
num := rand.Intn(1000)
rwMutex.Lock()// 写 加锁
value = num
fmt.Println("Id", idx, "Write", num)
time.Sleep(time.Millisecond * 300)
rwMutex.Unlock()
}
}
func main() {
// 随机数种子
rand.Seed(time.Now().UnixNano())
//ch := make(chan int)
//quit := make(chan bool)
// 5个读go程 5个写go程
for i := 0; i < 5; i++ {
go readGo(i)
}
for i := 0; i < 5; i++ {
go writeGo(i)
}
//<- quit
for {
;
}
}
使用channel模拟读写锁
var value int
func readGo(in <- chan int, idx int) {
for {
num := <- in
fmt.Println("Id", idx, "Read", num)
time.Sleep(time.Millisecond * 300)
}
}
func writeGo(out chan <- int, idx int) {
for {
// 生成随机数
num := rand.Intn(1000)
out <- num
fmt.Println("Id", idx, "Write", num)
time.Sleep(time.Millisecond * 300)
}
}
func main() {
// 随机数种子
rand.Seed(time.Now().UnixNano())
ch := make(chan int)
//quit := make(chan bool)
// 5个读go程 5个写go程
for i := 0; i < 5; i++ {
go readGo(ch, i)
}
for i := 0; i < 5; i++ {
go writeGo(ch, i)
}
//<- quit
for {
;
}
}
条件变量
var cond sync.Cond// 全局条件变量
func producer(out chan <- int, idx int) {
for {
// 加锁
cond.L.Lock()
// 判断缓冲区是否满
for len(out) == 5 {
cond.Wait()// 等待缓冲区有位置可写
}
num := rand.Intn(800)
out <- num
fmt.Println("Idx", idx, "Write", num)
// 解锁
cond.L.Unlock()
// 唤醒对端 即消费者
cond.Signal()
time.Sleep(time.Millisecond * 200)
}
}
func consumer(in <- chan int, idx int) {
for {
cond.L.Lock()
for len(in) == 0 {
cond.Wait()
}
num := <- in
fmt.Println("idx", idx, "Read", num)
cond.L.Unlock()
cond.Signal()
time.Sleep(time.Millisecond * 200)
}
}
func main() {
ch := make(chan int, 5)
//quit := make(chan int)
rand.Seed(time.Now().UnixNano())
// 指定条件变量使用的锁
cond.L = new(sync.Mutex)
for i := 0; i < 5; i++ {
go producer(ch, i)
}
for i := 0; i < 5; i++ {
go consumer(ch, i)
}
//<- quit
for {
;
}
}
请发表评论