• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

go基础第五篇:并发之sync包

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

协程是非常轻量级的,kb级,而线程是重量级的,默认是1M。

java中线程与系统线程是1:1的关系,而go中协程与系统线程是m:n的关系。KSE,kernel Space Entity。

go新建一个协程的方式就是用go关键字,示例:

func main() {
    start := time.Now()
    for i := 0; i < 3; i++ {
        go func() {
            time.Sleep(time.Second * 5)
        }()
    }
    time.Sleep(time.Second * 10)
    fmt.Println(time.Since(start))
}

总耗时10s多一点点,而不是20s,说明那5s确实是在子协程中sleep的。

在sync包中有很多实用的工具,如

sync.Mutex,它相当于java中的ReentrantLock,有lock()和unlock()方法。用法示例:

func main() {
    counter := 0
    start := time.Now()
    mutex := new(sync.Mutex)
    for i := 0; i < 50000; i++ {
        go func() {
            defer func() {
                mutex.Unlock()
            }()
            mutex.Lock()
            counter++
        }()
    }
    time.Sleep(time.Second * 1)
    fmt.Println(counter, ",", time.Since(start))
}

在defer中调用unlock方法。

sync.RWMutex,它相当于java中的ReentrantReadWriteLock,内部有一个读锁和写锁。用法同sync.Mutex。

sync.WaitGroup,它相当于java中的CountDownLatch,主协程等待一堆子协程执行完之后才继续执行。用法示例:

func main() {
    counter := 0
    mutex := new(sync.Mutex)
    start := time.Now()
    var waitGroup = new(sync.WaitGroup)
    for i := 0; i < 50000; i++ {
        go func() {
            waitGroup.Add(1)
            defer func() {
                mutex.Unlock()
            }()
            mutex.Lock()
            counter++
            waitGroup.Done()
        }()
    }
    waitGroup.Wait()
    fmt.Println(counter, ",", time.Since(start))
}

这50000个子任务实际只花了不到20ms,不用苦等1s那么久。

sync.Mutex、sync.RWMutex、sync.WaitGroup都是基于共享内存的并发机制,这种机制是在编程语言中比较通用的机制,java也有。go不推荐使用共享内存机制,而是推荐使用CSP并发模型机制。

CSP全称是Communicating Sequential Processes,可以翻译成通信顺序进程(Communicating翻译成通信的意思),在1977年被一位英国计算机科学家提出(这个科学家还发明了快排)。简单解释就是,CSP模型由并发执行的实体组成,实体之间通过发送消息进行通信,发送消息时使用的是通道。CSP模型的关键是通道,而不是发送消息的实体。口诀是Do not communicate by sharing memory,instead,share memory by communicating。不要以共享内存的方式来通信,相反,要通过通信来共享内存。

go的CSP并发模型,是通过goroutine和通道channel实现的。goroutine是并发执行的实体,底层使用协程(coroutine)实现并发。coroutine运行在用户态,从而避免了内核态和用户态的切换导致的成本。

channel可分为两种:

一种是普通channel,生产者往通道中放数据时必须有消费者从通道中取,不然生产者会阻塞;同样的,消费者从通道中取数据时,如果没有生产者往通道中放数据,消费者也会阻塞。

另一种是buffered channel,创建时要指定通道的大小,在未达到指定大小时,生产者可以任意往通道中放数据,不会阻塞,直到达到指定大小后,阻塞。

建channel也是用make关键字:

建普通channel,语法是make(chan type)或者make(chan type, 0),如ch := make(chan string),通道想放什么类型的数据都行,如int、string,甚至可以是interface{}。

建buffered channel,语法是make(chan type, value),value是个大于0的整数,如ch := make(chan string, 10),即只需要在make函数中添加第二个参数,指定channel的大小。

往通道中放数据、从通道中取数据,都要用到一个特殊的操作符<-,小于号后面跟一个中横线,好像一个左箭头,箭头的指向就是数据的流向,往通道中放数据,箭头要指向通道,通道<-,从通道中取数据,箭头要背向通道,<-通道。

func service() string {
    time.Sleep(time.Second * 1)
    return "Done"
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Second * 2)
    fmt.Println("Task is done")
}

func main() {
    start := time.Now()
    fmt.Println(service())
    otherTask()
    fmt.Println("cost", time.Since(start))
}

上例中,service函数需要执行1s,otherTask函数需要执行2s,上面这个程序,会先执行service函数,再执行otherTask函数,总耗时在3s。

现在我们要求总耗时2s,且能够在主协程中获取service函数的返回值。观察otherTask函数和service函数,otherTask函数和service函数的返回值没有关系,所以可以并行执行service函数和otherTask。把service函数另起一个协程执行,那么如何在主协程中获取子协程中的值呢?建个channel就好了,子协程往channel中放,主协程从channel中取。

改造如下:

func service() string {
    time.Sleep(time.Second * 1)
    return "Done"
}

func asyncService(ch chan string) {
    go func() {
        result := service()
        ch <- result
    }()
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Second * 2)
    fmt.Println("Task is done")
}

func main() {
    start := time.Now()
    ch := make(chan string)
    asyncService(ch)
    otherTask()
    result := <-ch
    fmt.Println("result= " + result)
    fmt.Println("cost", time.Since(start))
}

如上,在主协程中创建了一个channel,在子协程中执行service函数,并将返回值放入主协程创建的channel,在主协程中就可以从channel中取数据了。用的channel是一个普通channel,子协程执行完service函数后,把service函数返回值放到channel时会阻塞,因为otherTask函数还没执行完,主协程还不会从channel中取数据,直到otherTask函数执行完,主协程从channel中取数据,子协程才能把service函数返回值放到channel,主协程取出并使用。

我们还可以优化一下,把子协程解放出来,没必要阻塞一段时间,占用资源。用buffered channel替换普通channel,优化后代码如下:

func service() string {
    time.Sleep(time.Second * 1)
    return "Done"
}

func asyncService(ch chan string) {
    go func() {
        result := service()
        fmt.Println("service执行完毕")
        ch <- result
        fmt.Println("channel放入完毕")
    }()
}

func otherTask() {
    fmt.Println("working on something else")
    time.Sleep(time.Second * 2)
    fmt.Println("Task is done")
}

func main() {
    start := time.Now()
    ch := make(chan string, 1)
    asyncService(ch)
    otherTask()
    result := <-ch
    fmt.Println("result= " + result)
    fmt.Println("cost", time.Since(start))
}

只改动了一点,即把ch := make(chan string)改为了ch := make(chan string, 1)

 

多路选择和超时控制

select case块,语法糖是:

    select {
    case ret := <-ch1:
        fmt.Println("A", ret)
    case ret := <-ch2:
        fmt.Println("B", ret)
    }

示例如下:

func main() {
    ch1 := make(chan string)
    go handleChanel(ch1)

    ch2 := make(chan string)
    go handleChanel(ch2)

    time.Sleep(time.Second * 1)
    select {
    case ret := <-ch1:
        fmt.Println("A", ret)
    case ret := <-ch2:
        fmt.Println("B", ret)
    default:
        fmt.Println("DEFAULT")
    }

}

func handleChanel(ch chan string) {
    rd := rand.Intn(10)
    time.Sleep(time.Second * time.Duration(rd))
    ch <- strconv.Itoa(rd)
}

如果没有default,那么当执行到select case块时 ,假如所有的case都阻塞,那么当前协程就会阻塞,直到某一个case不阻塞;假如部分case阻塞,那么当前协程会找一个不阻塞的case执行(随机,不一定是先声明的case)。

如果有default,那么当执行到select case块时 ,假如所有的case都阻塞,那么当前协程会执行default;假如部分case阻塞,那么当前协程会找一个不阻塞的case执行(同样是随机的)。

利用多路选择机制,我们可以用来设计超时控制:

语法糖是:

    select {
    case ret := <-ch1:
        fmt.Println("A", ret)
    case <-time.After(time.Second * 5):
        fmt.Println("超时")
    }

示例如下:

func main() {
    ch1 := make(chan string)
    go handleChanel(ch1)

    ch2 := make(chan string)
    go handleChanel(ch2)

    select {
    case ret := <-ch1:
        fmt.Println("A", ret)
    case <-time.After(time.Second * 5):
        fmt.Println("超时")
    }

}

func handleChanel(ch chan string) {
    rd := rand.Intn(10)
    time.Sleep(time.Second * time.Duration(rd))
    ch <- strconv.Itoa(rd)
}

channel关闭 close(ch)

向关闭的channel发送数据,会导致协程panic。

v, ok := <-ch

ok为true时,表示channel未关闭,可正常从通道中取数据;ok为false时,表示通道已关闭,取出来的数据为相应数据类型的零值,如0,空字符串,nil等等。

channel关闭的广播机制

所有的channel接收者在channel关闭时都会立刻从阻塞等待中返回,且上述ok值为false。由此,可用于同时向多个订阅者发送信号,如退出信号。

可以用range遍历channel,在遍历前要确保信道处于关闭状态,否则循环会阻塞。

func main() {
    c := make(chan int, 3)

    c <- 1
    c <- 2
    c <- 3

    close(c)
    for k := range c {
        fmt.Println(k)
    }
}

 

 

 

 

 

用sync.Once可以保证某段代码只执行一次,示例如下:

func main() {
    var once sync.Once
    once.Do(func() {
        fmt.Println(1)
    })
    once.Do(func() {
        fmt.Println(2)
    })
}

我们利用sync.Once可以实现单例模式,也可以用来在高并发下刷新缓存,等等。

所有任务完成之后,才往下执行,这种需求,除了可以用sync.WaitGroup外,还可以用buffered channel,示例如下:

func main() {
    fmt.Println("A")

    num := 10

    ch := make(chan string, num)
    for i := 0; i < num; i++ {
        go func(i int) {
            defer func() {
                ch <- ""
                if r := recover(); r != nil {

                }
            }()
            fmt.Println(i)

        }(i)
    }

    for i := 0; i < num; i++ {
        <-ch
    }
    fmt.Println("B")
}

推荐使用buffered channel替代sync.WaitGroup。

利用buffered channel还可以实现对象池,如连接池。可以看下gorm的源码,看看gorm的连接池是怎么设计。

sync.Pool的目的是保存和复用临时对象,减少内存分配,减轻GC压力。

用Pool对象的Get()方法取,Put(x interface{})放。

用法示例:

func main() {
    p := &sync.Pool{
        New: func() interface{} {
            fmt.Println("耗时严重")
            return 0
        },
    }
    a := p.Get().(int)
    p.Put(1)
    b := p.Get().(int)
    fmt.Println(a, b)

    a = p.Get().(int)
    p.Put(1)
    runtime.GC() // 手动调用GC
    b = p.Get().(int)
    fmt.Println(a, b)
}

sync.Pool中的元素在GC时会被清空,我们可以调用runtime.GC()手动GC来验证。正常来说,一个服务在运行过程中,GC时间是不确定的,所以sync.Pool中是否有数据也不一定。如果sync.Pool中没有数据,那么Get时会调用创建Pool对象时NEW指定的函数生成数据并返回,第一次Get也是这样。从sync.Pool中get,用完后,别忘了put,否则会一直调用NEW对应的函数生成新数据,这样就达不到复用临时对象,减少内存分配的目的了。

sync.Pool不是只可以放一个数据,而是可以放任意多个数据,get取出的数据是乱序的,所以不能当做一个队列或者栈使用。

sync.Pool在fmt包中的各种打印方法中有使用,如fmt.Println()源码中p := newPrinter(),newPrinter()会从sync.Pool中取出一个可复用的对象,p.free(),就是把复用对象再放到sync.Pool中。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Go - map发布时间:2022-07-10
下一篇:
go语言之进阶篇定时器停止发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap