在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
#总所周知Go 是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收。如果你想使用 Go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响。因为Go 在垃圾回收的时候会有一个STW(stop-the-world,程序暂停)的时间,并且如果对象太多,做标记也需要时间。所以如果采用对象池来创建对象,增加对象的重复利用率,使用的时候就不必在堆上重新创建对象可以节省开销。在Go中,golang提供了对象重用的机制,也就是sync.Pool对象池。 sync.Pool是可伸缩的,并发安全的。其大小仅受限于内存的大小,可以被看作是一个存放可重用对象的值的容器。 设计的目的是存放已经分配的但是暂时不用的对象,在需要用到的时候直接从pool中取。 任何存放区其中的值可以在任何时候被删除而不通知,在高负载下可以动态的扩容,在不活跃时对象池会收缩。它对外提供了三个方法:New、Get 和 Put。下面用一个简短的例子来说明一下Pool使用: package main import ( "fmt" "sync" ) var pool *sync.Pool type Person struct { Name string } func init() { pool = &sync.Pool{ New: func() interface{} { fmt.Println("creating a new person") return new(Person) }, } } func main() { person := pool.Get().(*Person) fmt.Println("Get Pool Object1:", person) person.Name = "first" pool.Put(person) fmt.Println("Get Pool Object2:", pool.Get().(*Person)) fmt.Println("Get Pool Object3:", pool.Get().(*Person)) } 结果: creating a new person Get Pool Object1: &{} Get Pool Object2: &{first} creating a new person Get Pool Object3: &{} 这里我用了init方法初始化了一个pool,然后get了三次,put了一次到pool中,如果pool中没有对象,那么会调用New函数创建一个新的对象,否则会从put进去的对象中获取。 存储在池中的任何项目都可以随时自动删除,并且不会被通知。Pool可以安全地同时使用多个goroutine。池的目的是缓存已分配但未使用的对象以供以后重用,从而减轻对gc的压力。也就是说,它可以轻松构建高效,线程安全的free列表。但是,它不适用于所有free列表。池的适当使用是管理一组默认共享的临时项,并且可能由包的并发独立客户端重用。池提供了一种在许多客户端上分摊分配开销的方法。很好地使用池的一个例子是fmt包,它维护一个动态大小的临时输出缓冲区存储。底层存储队列在负载下(当许多goroutine正在积极打印时)进行缩放,并在静止时收缩。另一方面,作为短期对象的一部分维护的空闲列表不适合用于池, 因为在该场景中开销不能很好地摊销。 使这些对象实现自己的空闲列表更有效。首次使用后不得复制池。 pool 的两个特点 在多个goroutine之间使用同一个pool做到高效,是因为sync.pool为每个P都分配了一个子池, 源码分析type Pool struct { // 不允许复制,一个结构体,有一个Lock()方法,嵌入别的结构体中,表示不允许复制 // noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制 noCopy noCopy //local 和 localSize 维护一个动态 poolLocal 数组 // 每个固定大小的池, 真实类型是 [P]poolLocal // 其实就是一个[P]poolLocal 的指针地址 local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array victim unsafe.Pointer // local from previous cycle victimSize uintptr // size of victims array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. // New 是一个回调函数指针,当Get 获取到目标对象为 nil 时,需要调用此处的回调函数用于生成 新的对象 New func() interface{} }
local是一个poolLocal数组的指针,localSize代表这个数组的大小;同样victim也是一个poolLocal数组的指针,每次垃圾回收的时候,Pool 会把 victim 中的对象移除,然后把 local 的数据给 victim;local和victim的逻辑我们下面会详细介绍到。 New函数是在创建pool的时候设置的,当pool没有缓存对象的时候,会调用New方法生成一个新的对象。 下面我们对照着pool的结构图往下讲,避免找不到北: // Local per-P Pool appendix. /* 因为poolLocal中的对象可能会被其他P偷走, private域保证这个P不会被偷光,至少保留一个对象供自己用。 否则,如果这个P只剩一个对象,被偷走了, 那么当它本身需要对象时又要从别的P偷回来,造成了不必要的开销 */ type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared poolChain // Local P can pushHead/popHead; any P can popTail. } type poolLocal struct { poolLocalInternal // Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . /** cache使用中常见的一个问题是false sharing。 当不同的线程同时读写同一cache line上不同数据时就可能发生false sharing。 false sharing会导致多核处理器上严重的系统性能下降。 字节对齐,避免 false sharing (伪共享) */ pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte } local字段存储的是一个poolLocal数组的指针,poolLocal数组大小是goroutine中P的数量,访问时,P的id对应poolLocal数组下标索引,所以Pool的最大个数runtime.GOMAXPROCS(0)。 通过这样的设计,每个P都有了自己的本地空间,多个 goroutine 使用同一个 Pool 时,减少了竞争,提升了性能。如果对goroutine的P、G、M有疑惑的同学不妨看看这篇文章:The Go scheduler。 poolLocal里面有一个pad数组用来占位用,防止在 cache line 上分配多个 poolLocalInternal从而造成false sharing,cache使用中常见的一个问题是false sharing。当不同的线程同时读写同一cache line上不同数据时就可能发生false sharing。false sharing会导致多核处理器上严重的系统性能下降。具体的可以参考伪共享(False Sharing)。 poolLocalInternal包含两个字段private和shared。 private代表缓存的一个元素,只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,所以不会有并发的问题;所以无需加锁 shared则可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。因为可能有多个goroutine同时操作,所以需要加锁。 type poolChain struct { // head is the poolDequeue to push to. This is only accessed // by the producer, so doesn't need to be synchronized. head *poolChainElt // tail is the poolDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. tail *poolChainElt } type poolChainElt struct { poolDequeue // next and prev link to the adjacent poolChainElts in this // poolChain. // // next is written atomically by the producer and read // atomically by the consumer. It only transitions from nil to // non-nil. // // prev is written atomically by the consumer and read // atomically by the producer. It only transitions from // non-nil to nil. next, prev *poolChainElt } type poolDequeue struct { // headTail packs together a 32-bit head index and a 32-bit // tail index. Both are indexes into vals modulo len(vals)-1. // // tail = index of oldest data in queue // head = index of next slot to fill // // Slots in the range [tail, head) are owned by consumers. // A consumer continues to own a slot outside this range until // it nils the slot, at which point ownership passes to the // producer. // // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. headTail uint64 // vals is a ring buffer of interface{} values stored in this // dequeue. The size of this must be a power of 2. // // vals[i].typ is nil if the slot is empty and non-nil // otherwise. A slot is still in use until *both* the tail // index has moved beyond it and typ has been set to nil. This // is set to nil atomically by the consumer and read // atomically by the producer. vals []eface } type eface struct { typ, val unsafe.Pointer } poolChain是一个双端队列,里面的head和tail分别指向队列头尾;poolDequeue里面存放真正的数据,是一个单生产者、多消费者的固定大小的无锁的环状队列,headTail是环状队列的首位位置的指针,可以通过位运算解析出首尾的位置,生产者可以从 head 插入、head 删除,而消费者仅可从 tail 删除。 这个双端队列的模型大概是这个样子: poolDequeue里面的环状队列大小是固定的,后面分析源码我们会看到,当环状队列满了的时候会创建一个size是原来两倍大小的环状队列。大家这张图好好体会一下,会反复用到。 Get方法#// Get selects an arbitrary item from the Pool, removes it from the // Pool, and returns it to the caller. // Get may choose to ignore the pool and treat it as empty. // Callers should not assume any relation between values passed to Put and // the values returned by Get. // // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } l, pid := p.pin() //1.把当前goroutine绑定在当前的P上 x := l.private //2.优先从local的private中获取 l.private = nil if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. x, _ = l.shared.popHead() //3,private没有,那么从shared的头部获取 if x == nil { x = p.getSlow(pid) //4. 如果都没有,那么去别的local上去偷一个 } } runtime_procUnpin() //解除抢占 if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } //5. 如果没有获取到,尝试使用New函数生成一个新的 if x == nil && p.New != nil { x = p.New() } return x }
pin# // pin 会将当前 goroutine 订到 P 上, 禁止抢占(preemption) 并从 poolLocal 池中返回 P 对应的 poolLocal // 调用方必须在完成取值后调用 runtime_procUnpin() 来取消禁止抢占。 // pin pins the current goroutine to P, disables preemption and // returns poolLocal pool for the P and the P's id. // Caller must call runtime_procUnpin() when done with the pool. func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() // In pinSlow we store to local and then to localSize, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). // 因为可能存在动态的 P(运行时调整 P 的个数)procresize/GOMAXPROCS // 如果 P.id 没有越界,则直接返回 PID /** 具体的逻辑就是首先拿到当前的pid, 然后以pid作为index找到local中的poolLocal, 但是如果pid大于了localsize, 说明当前线程的poollocal不存在,就会新创建一个poolLocal */ s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { return indexLocal(l, pid), pid } // 没有结果时,涉及全局加锁 // 例如重新分配数组内存,添加到全局列表 return p.pinSlow() } func indexLocal(l unsafe.Pointer, i int) *poolLocal { lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{})) return (*poolLocal)(lp) } pin方法里面首先会调用runtime_procPin方法会先获取当前goroutine,然后绑定到对应的M上,然后返回M目前绑定的P的id,因为这个pid后面会用到,防止在使用途中P被抢占,具体的细节可以看这篇:https://zhuanlan.zhihu.com/p/99710992。接下来会使用原子操作取出localSize,如果当前pid大于localSize,那么就表示Pool还没创建对应的poolLocal,那么调用pinSlow进行创建工作,否则调用indexLocal取出pid对应的poolLocal返回。 indexLocal里面是使用了地址操作,传入的i是数组的index值,所以需要获取poolLocal{}的size做一下地址的位移操作,然后再转成转成poolLocal地址返回。 pinSlow# func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. //因为需要对全局进行加锁,pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁 runtime_procUnpin() // 解除pin allPoolsMu.Lock() // 加上全局锁 defer allPoolsMu.Unlock() pid := runtime_procPin() // pin住 // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local // 重新对pid进行检查 再次检查是否符合条件,因为可能中途已被其他线程调用 // 当再次固定 P 时 poolCleanup 不会被调用 if uintptr(pid) < s { return indexLocal(l, pid), pid } // 初始化local前会将pool放入到allPools数组中 if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) // 当前P的数量 local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid], pid } 因为allPoolsMu是一个全局Mutex锁,因此上锁会比较慢可能被阻塞,所以上锁前调用runtime_procUnpin方法解除pin的操作; 在解除绑定后,pinSlow 可能被其他的线程调用过了,p.local 可能会发生变化。因此这时候需要再次对 pid 进行检查。 最后初始化local,并使用原子操作对local和localSize设值,返回当前P对应的local。 到这里pin方法终于讲完了。画一个简单的图描述一下这整个流程: 下面我们再回到Get方法中,如果private中没有值,那么会调用shared的popHead方法获取值。 popHead# func (c *poolChain) popHead() (interface{}, bool) { d := c.head // 这里头部是一个poolChainElt // 遍历poolChain链表 for d != nil { // 从poolChainElt的环状列表中获取值 if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. // load poolChain下一个对象 d = loadPoolChainElt(&d.prev) } return nil, false } // popHead removes and returns the element at the head of the queue. // It returns false if the queue is empty. It must only be called by a // single producer. func (d *poolDequeue) popHead() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // headTail的高32位为head,低32位为tail if tail == head { // Queue is empty. // 首尾相等,那么这个队列就是空的 return nil, false } // Confirm tail and decrement head. We do this before // reading the value to take back ownership of this // slot. head-- // 这里需要head--之后再获取slot ptrs2 := d.pack(head, tail) if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // We successfully took back slot. slot = &d.vals[head&uint32(len(d.vals)-1)] break } } val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { // 说明没取到缓存的对象,返回 nil val = nil } // Zero the slot. Unlike popTail, this isn't racing with // pushHead, so we don't need to be careful here. *slot = eface{} // 重置slot return val, true }
poolChain的popHead方法里面会获取到poolChain的头结点,不记得poolChain数据结构的同学建议往上面翻一下再回来。接着有个for循环会挨个从poolChain的头结点往下遍历,直到获取对象返回。
如果shared的popHead方法也没获取到值,那么就需要调用getSlow方法获取了。 getSlow# // 从其他P的共享缓冲区偷取 obj func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire 获取当前 poolLocal 的大小 locals := p.local // load-consume 获取当前 poolLocal // Try to steal one element from other procs. // 遍历locals列表,从其他的local的shared列表尾部获取对象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Try the victim cache. We do this after attempting to steal // from all primary caches because we want objects in the // victim cache to age out if at all possible. size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } locals = p.victim l := indexLocal(locals, pid) // victim的private不为空则返回 if x := l.private; x != nil { l.private = nil return x } // 遍历victim对应的locals列表,从其他的local的shared列表尾部获取对象 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } } // Mark the victim cache as empty for future gets don't bother // with it. // 获取不到,将victimSize置为0 atomic.StoreUintptr(&p.victimSize, 0) return nil }
getSlow方法会遍历locals列表,这里需要注意的是,遍历是从索引为 pid+1 的 poolLocal 处开始,尝试调用shared的popTail方法获取对象;如果没有拿到,则从 victim 里找。如果都没找到,那么就将victimSize置为0,下次就不找victim了。 poolChain&popTail# func (c *poolChain) popTail() (interface{}, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false // 如果最后一个节点是空的,那么直接返回 } for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. // 这里获取的是next节点,与一般的双向链表是相反的 d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false } // The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. // 因为d已经没有数据了,所以重置tail为d2,并删除d2的上一个节点 if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. storePoolChainElt(&d2.prev, nil) } d = d2 } }
poolDequeue&popTail# // popTail removes and returns the element at the tail of the queue. // It returns false if the queue is empty. It may be called by any // number of consumers. func (d *poolDequeue) popTail() (interface{}, bool) { var slot *eface for { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) // 和pophead一样,将headTail解包 if tail == head { // Queue is empty. // 首位相等,表示列表中没有数据,返回 return nil, false } // Confirm head and tail (for our speculative check // above) and increment tail. If this succeeds, then // we own the slot at tail. ptrs2 := d.pack(head, tail+1) // CAS重置tail位置 if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { // Success. // 获取tail位置对象 slot = &d.vals[tail&uint32(len(d.vals)-1)] break } } // We now own slot. val := *(*interface{})(unsafe.Pointer(slot)) if val == dequeueNil(nil) { val = nil } // Tell pushHead that we're done with this slot. Zeroing the // slot is also important so we don't leave behind references // that could keep this object live longer than necessary. // // We write to val first and then publish that we're done with // this slot by atomically writing to typ. slot.val = nil atomic.StorePointer(&slot.typ, nil) // At this point pushHead owns the slot. return val, true } 如果看懂了popHead,那么这个popTail方法是和它非常的相近的。 popTail简单来说也是从队列尾部移除一个元素,如果队列为空,返回 false。但是需要注意的是,这个popTail可能会被多个消费者调用,所以需要循环CAS获取对象;在poolDequeue环状列表中tail是有数据的,不必像popHead中 最后,需要将slot置空。 大家可以再对照一下图回顾一下代码: Put方法#// Put adds x to the pool. func (p *Pool) Put(x interface{}) { if x == nil { return } if race.Enabled { if fastrand()%4 == 0 { // Randomly drop x on floor. return } race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } l, _ := p.pin() // 先获得当前P绑定的 localPool if l.private == nil { l.private = x x = nil } if x != nil { l.shared.pushHead(x) } // 调用方必须在完成取值后调用 runtime_procUnpin() 来取消禁用抢占 runtime_procUnpin() if race.Enabled { race.Enable() } }
看完了Get方法,看Put方法就容易多了。同样Put方法首先会去Pin住当前goroutine和P,然后尝试将 x 赋值给 private 字段。如果private不为空,那么就调用pushHead将其放入到shared队列中。 poolChain&pushHead# func (c *poolChain) pushHead(val interface{}) { d := c.head // 头节点没有初始化,那么设值一下 if d == nil { // Initialize the chain. const initSize = 8 // Must be a power of 2 d = new(poolChainElt) d.vals = make([]eface, initSize) c.head = d storePoolChainElt(&c.tail, d) } // 将对象加入到环状队列中 if d.pushHead(val) { return } // The current dequeue is full. Allocate a new one of twice // the size. newSize := len(d.vals) * 2 // 这里做了限制,单个环状队列不能超过2的30次方大小 if newSize >= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } // 初始化新的环状列表,大小是d的两倍 d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) // push到新的队列中 d2.pushHead(val) }
如果头节点为空,那么需要创建一个新的poolChainElt对象作为头节点,大小为8;然后调用pushHead放入到环状队列中; 如果放置失败,那么创建一个 poolChainElt 节点,并且双端队列的长度翻倍,当然长度也不能超过dequeueLimit,即2的30次方; 然后将新的节点d2和d互相绑定一下,并将d2设值为头节点,将传入的对象push到d2中; poolDequeue&pushHead# // pushHead adds val at the head of the queue. It returns false if the // queue is full. It must only be called by a single producer. func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(& 全部评论
专题导读
热门推荐
热门话题
阅读排行榜
|
请发表评论