go最大的特性就是并发了,所以这一块是go的重点知识,我自己花了一些时间,好好学习这个基础知识。
声明
文章内容为个人学习理解,所以文章如果有不对之处,非常感谢指出。
goroutine
说起go并发,一般都会指go协程,即goroutine ,而实现goroutin 的关键字就是 go 。
我学习go并发用到的关键字总结
- go //异步执行
- chan // make channel 用于各协程间通信 (
重点 )
- ch := make(chan 数据类型, 缓冲区容量) // 格式
- sync.Mutex // 锁
- m.Lock()// 实例化的sync.Mutex对象 即var m sync.Mutex
- m.Unlock()
- sync.Once // 只执行一次函数
- var once sync.Once once.DO(func(){})
- sync.WaitGroup // 等待所有协程执行完
- wg.Add(协程数量) // 添加执行的协程数量
- wg.Done() // 每执行完一个单元协程就执行一次Done 把上面添加的数量 -1
- wg.Wait() // 等待所有的协程执行完
- runtime.NumGoroutine() // 查看当前运行的协程数量
最基础的一个demo
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("Hello World")
// 异步执行协程函数
go func() {
fmt.Println("goroutine 执行了")
}()
// sleep 1 秒的目的是为了 等待协执行完看到结果 不至于 主程序执行完了 协程还没结束
time.Sleep(1*time.Second)
}
channel 协程间数据通信
go里面协程间可以使用channel进行通信,不推荐直接使用变量交互数据,避免数据操作混乱。
协程数据相当于一个 管道队列机构 先进先出
无缓冲区
无缓冲区时,必须同时读取才行,否则会报错 fatal error: all goroutines are asleep - deadlock!
错误的例子
func main(){
// 定义一个int类型的channel channel 只能使用make创建
c1 := make(chan int)
// 无缓冲区 单独存 或者取 或者 先后执行存取都会报错
// c1 <- 1
n := <- c1
fmt.Println(n)
}
正确的例子
func main(){
c1 := make(chan int)
go func() {
c1 <- 1
}()
n := <- c1
// 输出1
fmt.Println(n)
}
补充说明一下,当缓冲区还没有数据进来时,读取操作会被阻塞, 比如以下例子
func main(){
c1 := make(chan int)
go func() {
// sleep 5秒
time.Sleep(5*time.Second)
c1 <- 1
}()
// 先输出 ----
fmt.Println("-----")
n := <- c1
// 等待5秒缓冲区有数据了 程序才会接着往下执行
fmt.Println("+++++")
fmt.Println(n)
}
有缓冲区
func main(){
// 声明一个channel Buffer为3 表示可以存储 3个 int 数据
c := make(chan int, 3)
c <- 1
c <- 2
c <- 8
// 取出一个数据(可以选择不接收) 队列顺序
<-c
// 查看channel的 Buffer 容量
t.Log(cap(c))
// 也可以多次取值
x, y := <-c, <-c
t.Log(fmt.Sprintf("%d + %d = %d", x, y, x+y))
}
上面的例子中,一个个的写入和读取channel中的数据,特别麻烦,于是就有循环写入和读取。需要注意的是使用range 循环读取时,channel必须得用close 来结束,
否则就回因为range 无法判断是否结束,而导致异常
func main(){
// 声明一个 string chan
c := make(chan string, 3)
// 协程异步调用 添加到chan
go func(n int, c chan string) {
for i := 0; i < n; i++ {
// 循环添加到
c <- fmt.Sprintf("--- %d ---", i)
}
// 结束添加
close(c)
}(cap(c), c)
// 循环取出
for v := range c {
t.Log(v)
}
}
多个channnel操作
使用sync.WaitGroup 等待多个goroutine同时执行完,使用select 和case 随机执行返回值
// 两个协程任务分别往不同的channel存入数据
func AsyncCh1(n int, c chan string, wg *sync.WaitGroup) {
for i := 0; i < n; i++ {
c <- fmt.Sprintf("++ %d ++", i)
}
// Done() 结束时会把之前添加的协程数量减一
wg.Done()
}
func AsyncCh2(n int, c chan string, wg *sync.WaitGroup) {
for i := n; i > 0; i-- {
c <- fmt.Sprintf("-- %d --", i)
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
ch1 := make(chan string, 5)
ch2 := make(chan string, 5)
// 添加协程执行数量
wg.Add(2)
go AsyncCh1(cap(ch1), ch1, &wg)
//wg.Add(1)
go AsyncCh2(cap(ch2), ch2, &wg)
// 等待协程全部执行完
wg.Wait()
for i := 0; i < 10; i++ {
// 随机从任意的channel中取值 如果有就回立即返回
select {
case ret1 := <-ch1:
t.Log(ret1)
case ret2 := <-ch2:
t.Log(ret2)
// 设置的超时 如果任意的协程超过100毫秒就回报错
case <-time.After(time.Millisecond * 100):
t.Error("time out")
}
}
}
模拟实现超时操作,把上面的例子改造一下,然后添加超时操作。
func AsyncCh1(n int, c chan string) {
// 添加3秒sleep
time.Sleep(3*time.Second)
for i := 0; i < n; i++ {
c <- fmt.Sprintf("++ %d ++", i)
}
// 去掉等待结束完成
//wg.Done()
}
func AsyncCh2(n int, c chan string) {
time.Sleep(3*time.Second)
for i := n; i > 0; i-- {
c <- fmt.Sprintf("-- %d --", i)
}
}
func TestSelect(t *testing.T) {
// 去掉等待
//var wg sync.WaitGroup
ch1 := make(chan string, 5)
ch2 := make(chan string, 5)
go AsyncCh1(cap(ch1), ch1)
go AsyncCh2(cap(ch2), ch2)
for i := 0; i < 10; i++ {
select {
case ret1 := <-ch1:
t.Log(ret1)
case ret2 := <-ch2:
t.Log(ret2)
// 添加1秒超时检测 结果就是前两个数据会打印time out
case <-time.After(time.Second * 1):
t.Error("time out")
}
}
}
模拟生产和消费
如果是使用了close 关闭chan,那么channel 取值其实是有两个返回值的,相当于close发出了一个信号
v, ok := <-c;if ok {
fmt.Println(fmt.Sprintf("有数据 %d", v))
} else {
fmt.Println("没有数据")
}
如下完整例子
// 不断的生产
func Producer1(c chan int, wg *sync.WaitGroup) {
for i := 0; i <= 10; i++ {
fmt.Println(fmt.Sprintf("生产了++++++ %d", i))
time.Sleep(time.Millisecond * 100)
c <- i
}
// 关闭channel
close(c)
//c <- 22 // 关闭后就不能发了 panic: send on closed channel
wg.Done()
}
// 不断的从chanel里面拿
func Consumer(c chan int, wg *sync.WaitGroup) {
for {
//time.Sleep(time.Millisecond*800)
// 判断生产者是否已经停止了
v, ok := <-c
if ok {
fmt.Println(fmt.Sprintf("-------消费了 %d", v))
} else {
fmt.Println("结束")
break
}
}
wg.Done()
}
func main() {
var wg sync.WaitGroup
c := make(chan int, 20)
wg.Add(2)
go Producer1(c, &wg)
go Consumer(c, &wg)
wg.Wait()
}
仅读和仅写的channel
func Producer5(writeC chan<- int) {
for i := 0; i < 10; i++ {
fmt.Printf("生产+++%d\n", i)
writeC <- i
}
}
func Consumer5(redC <-chan int) {
for i := 0; i < 10; i++ {
fmt.Printf("-----------------消费 %d \n", <-redC)
}
}
func main() {
c := make(chan int, 15)
// 只读
var redC <-chan int = c
// 只写
var writeC chan<- int = c
// 生产
go Producer5(writeC)
// 消费
Consumer5(redC)
}
只执行一次的方法 sync.Once
var once sync.Once
func NormalFunc(i int) {
timeStr := time.Now().Format("2006-01-02 15:04:05")
fmt.Printf(" %d 测试函数 %s \n", i, timeStr)
}
func SingleFunc(i int) {
fmt.Printf("单例测试函数执行++ %d \n", i)
once.Do(func() {
// 这里面只执行一次
timeStr := time.Now().Format("2006-01-02 15:04:05")
fmt.Printf("%d------单例子测试函数 只执行一次 %s \n", i, timeStr)
})
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
NormalFunc(i)
SingleFunc(i)
wg.Done()
}(i)
}
wg.Wait()
}
利用channel 构建对象池
// 新建一个空结构体 相当于对象
type Tool struct {
name string
}
// 对象池 用于存储 Tool对象
type ToolsBox struct {
// 属性是一个 channel 内容是 Tool 结构体指针
bufChan chan *Tool
}
// 获取工具 给结构体绑定方法
func (p *ToolsBox) GetTool(timeout time.Duration) (*Tool, error) {
select {
case tool := <-p.bufChan:
return tool, nil
case <-time.After(timeout): //超时控制
return nil, errors.New("time out")
}
}
// 用完归还(释放)
func (p *ToolsBox) ReleaseTool(tool *Tool) error {
select {
case p.bufChan <- tool:
return nil
default:
return errors.New("overflow")
}
}
// new一个 ToolBox对象
func NewToolsBox(poolNum int) *ToolsBox {
objPool := ToolsBox{}
objPool.bufChan = make(chan *Tool, poolNum)
for i := 0; i < poolNum; i++ {
// 生成一个 工具结构体
tool := &Tool{fmt.Sprintf("????--%d", i)}
// 存入对象池
objPool.bufChan <- tool
}
return &objPool
}
func main() {
pool := NewToolsBox(5)
for i := 0; i < 8; i++ {
tool, err := pool.GetTool(time.Second * 1)
if err != nil {
t.Log(fmt.Sprintf("---取出有问题 %s 当前容量%d", tool, len(pool.bufChan)))
} else {
// 取出没问题
t.Log(fmt.Sprintf("----取出一个 %s 当前容量%d", tool, len(pool.bufChan)))
// 接着就释放 和判断写在一起
if err := pool.ReleaseTool(tool); err != nil {
t.Log("释放有问题")
} else {
t.Log(fmt.Sprintf("释放一个 +++ %s 当前容量%d", tool, len(pool.bufChan)))
}
}
}
t.Log("结束")
}
代码地址
github
|
请发表评论