• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

go一步步实现GoroutinePool

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

 

超大规模并发的场景下,不加限制的大规模的goroutine可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢。

而实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。

 

 

 

 

Pool类型

type Pool struct {
	// capacity of the pool.
	//capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
	//worker类型为任务类。
	capacity int32
	// running is the number of the currently running goroutines.
	//running是当前正在执行任务的worker数量
	running int32
	// expiryDuration set the expired time (second) of every worker.
	//expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
	expiryDuration time.Duration
	// workers is a slice that store the available workers.
	//任务队列
	workers []*Worker
	// release is used to notice the pool to closed itself.
	//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
	release chan sig
	// lock for synchronous operation
	//用以支持Pool的同步操作
	lock sync.Mutex
	//once用在确保Pool关闭操作只会执行一次
	once sync.Once
}

初始化Pool

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Pool Size <0,not Create")
	}
	p := &Pool{
		capacity:       int32(size),
		release:        make(chan sig, 1),
		expiryDuration: time.Duration(expiry) * time.Second,
		running:		0,
	}
	// 启动定期清理过期worker任务,独立goroutine运行,
	// 进一步节省系统资源
	p.monitorAndClear()
	return p, nil
}

获取Worker

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
	var w *Worker
	// 标志,表示当前运行的worker数量是否已达容量上限
	waiting := false
	// 涉及从workers队列取可用worker,需要加锁
	p.lock.Lock()
	workers := p.workers
	n := len(workers) - 1
	fmt.Println("空闲worker数量:",n+1)
	fmt.Println("协程池现在运行的worker数量:",p.running)
	// 当前worker队列为空(无空闲worker)
	if n < 0 {
		//没有空闲的worker有两种可能:
		//1.运行的worker超出了pool容量
		//2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
		// 运行worker数目已达到该Pool的容量上限,置等待标志
		if p.running >= p.capacity {
			//print("超过上限")
			waiting = true
		} else {
			// 当前无空闲worker但是Pool还没有满,
			// 则可以直接新开一个worker执行任务
			p.running++
			w = &Worker{
				pool: p,
				task: make(chan functinType),
				str:make(chan string),
			}
		}
		// 有空闲worker,从队列尾部取出一个使用
	} else {
		//<-p.freeSignal
		w = workers[n]
		workers[n] = nil
		p.workers = workers[:n]
		p.running++
	}
	// 判断是否有worker可用结束,解锁
	p.lock.Unlock()
	if waiting {
		//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
		// 阻塞等待直到有空闲worker
		for len(p.workers) == 0{
			continue
		}
		p.lock.Lock()
		workers = p.workers
		l := len(workers) - 1
		w = workers[l]
		workers[l] = nil
		p.workers = workers[:l]
		p.running++
		p.lock.Unlock()
	}
	return w
}

定期清理过期Worker

func (p *Pool) monitorAndClear() {
	go func() {
		for {
			// 周期性循环检查过期worker并清理
			time.Sleep(p.expiryDuration)
			currentTime := time.Now()
			p.lock.Lock()
			idleWorkers := p.workers
			n := 0
			for i, w := range idleWorkers {
				// 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
				if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
					break
				}
				n = i
				w.stop()
				idleWorkers[i] = nil
			}
			if n > 0 {
				n++
				p.workers = idleWorkers[n:]
			}
			p.lock.Unlock()
		}
	}()
}

复用Worker

// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
	// 写入回收时间,亦即该worker的最后运行时间
	worker.recycleTime = time.Now()
	p.lock.Lock()
	p.running --
	p.workers = append(p.workers, worker)
	p.lock.Unlock()

}

动态扩容或者缩小容量

// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
	cap := int(p.capacity)
	if size <  cap{
		diff := cap - size
		for i := 0; i < diff; i++ {
			p.getWorker().stop()
		}
	} else if size == cap {
		return
	}
	atomic.StoreInt32(&p.capacity, int32(size))
} 

提交Worker

// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
	if len(p.release) > 0 {
		return errors.New("Pool is Close")
	}
	//创建或得到一个空闲的worker
	w := p.getWorker()
	w.run()
	//将任务参数通过信道传递给它
	w.sendarg(str)
	//将任务通过信道传递给它
	w.sendTask(task)
	return nil
}

  

Worker类

package Poolpkg

import (
	"sync/atomic"
	"time"
)

type functinType func(string) error


// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
	// pool who owns this worker.
	pool *Pool
	// task is a job should be done.
	task chan functinType
	// recycleTime will be update when putting a worker back into queue.
	recycleTime time.Time

	str chan string
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {

	go func() {
		//监听任务列表,一旦有任务立马取出运行
		count := 1
		var str string
		var f functinType
		for count <=2{
			select {
			case str_temp, ok := <- w.str:
				if !ok {
					return
				}
				count ++
				str = str_temp
			case f_temp, ok := <-w.task:
				if !ok {
					//如果接收到关闭
					atomic.AddInt32(&w.pool.running, -1)
					close(w.task)
					return
				}
				count  ++
				f = f_temp
			}
		}
		err := f(str)
		if err != nil{
			//fmt.Println("执行任务失败")
		}
		//回收复用
		w.pool.putWorker(w)
		return
	}()
}

// stop this worker.
func (w *Worker) stop() {
	w.sendTask(nil)
	close(w.str)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
	w.task <- task
}

func (w *Worker) sendarg(str string) {
	w.str <- str
}

  

总结和实践

怎么理解Woreker,task、Pool的关系

Woker类型其实就是task的载体,Worker类型有两个很重要的参数:

task chan functinType:用来是传递task。
str chan string:用来传递task所需的参数。

task是任务本身,它一般为一个函数,在程序中被定义为函数类型:

type functinType func(string) error

Pool存储Worker,当用户要执行一个task时,首先要得到一个Worker,必须从池中获取,获取到一个Worker后,就开启一个协程去处理,在这个协程中接收任务task和参数。

//创建或得到一个空闲的worker
w := p.getWorker()
//开协程去处理 w.run() //将任务参数通过信道传递给它 w.sendarg(str) //将任务通过信道传递给它 w.sendTask(task)

Worker怎么接收task和参数

count定义接收数据的个数,一个Woker必须接收到task和参数才能开始工作。
工作完后这个Worker被返回到Pool中,下次还可以复用这个Worker,也就是复用Worker这个实例。
go func() {
		//监听任务列表,一旦有任务立马取出运行
		count := 1
		var str string
		var f functinType
		for count <=2{
			select {
			case str_temp, ok := <- w.str:
				if !ok {
					return
				}
				count ++
				str = str_temp
			case f_temp, ok := <-w.task:
				if !ok {
					//如果接收到关闭
					atomic.AddInt32(&w.pool.running, -1)
					close(w.task)
					return
				}
				count  ++
				f = f_temp
			}
		}
		err := f(str)
		if err != nil{
			//fmt.Println("执行任务失败")
		}
		//回收复用
		w.pool.putWorker(w)
		return
	}()

Pool怎么处理用户提交task获取Worker的请求

1.先得到Pool池中空闲Worker的数量,然后判断

2.如果小于零,则表示池中没有空闲的Worker,这里有两种原因:

  • 1.运行的Worker数量超过了Pool容量,当用户获取Worker的请求数量激增,池中大多数Worker都是执行完任务的Worker重新添加到池中的,返回的Worker跟不上激增的需求。
  • 2.当前是空pool,从未往pool添加任务或者一段时间内没有Worker任务运行,被定期清除。

3.如果大于或者等于零,有空闲的Worker直接从池中获取最后一个Worker。

4.如果是第二种的第一种情况,则阻塞等待池中有空闲的Worker。

if waiting {
		//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
		// 阻塞等待直到有空闲worker
		for len(p.workers) == 0{
			continue
		}
		p.lock.Lock()
		workers = p.workers
		l := len(workers) - 1
		w = workers[l]
		workers[l] = nil
		p.workers = workers[:l]
		p.running++
		p.lock.Unlock()
	}

5.如果是第二种的第二种情况,直接创建一个Worker实例。

// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
p.running++
w = &Worker{
	pool: p,
	task: make(chan functinType),
	str:make(chan string),
}

测试

package main

import (
	"Pool/Poolpkg"
	"fmt"
)

func main(){
     //开20个大小的Pool池,过期清除时间5分钟 Pool,err := Poolpkg.NewPool(20,5) i :=0 for i < 50 { err = Pool.Submit(Print_Test1,"并发测试!") if err != nil{ fmt.Println(err) } i++ } }

 

 

 

 

源码

Pool

package Poolpkg

import (
	"errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type sig struct{}



// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
	// capacity of the pool.
	//capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
	//worker类型为任务类。
	capacity int32
	// running is the number of the currently running goroutines.
	//running是当前正在执行任务的worker数量
	running int32
	// expiryDuration set the expired time (second) of every worker.
	//expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
	expiryDuration time.Duration
	// workers is a slice that store the available workers.
	//任务队列
	workers []*Worker
	// release is used to notice the pool to closed itself.
	//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
	release chan sig
	// lock for synchronous operation
	//用以支持Pool的同步操作
	lock sync.Mutex
	//once用在确保Pool关闭操作只会执行一次
	once sync.Once
}

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Pool Size <0,not Create")
	}
	p := &Pool{
		capacity:       int32(size),
		release:        make(chan sig, 1),
		expiryDuration: time.Duration(expiry) * time.Second,
		running:		0,
	}
	// 启动定期清理过期worker任务,独立goroutine运行,
	// 进一步节省系统资源
	p.monitorAndClear()
	return p, nil
}

// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
	if len(p.release) > 0 {
		return errors.New("Pool is Close")
	}
	//创建或得到一个空闲的worker
	w := p.getWorker()
	w.run()
	//将任务参数通过信道传递给它
	w.sendarg(str)
	//将任务通过信道传递给它
	w.sendTask(task)
	return nil
}

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
	var w *Worker
	// 标志,表示当前运行的worker数量是否已达容量上限
	waiting := false
	// 涉及从workers队列取可用worker,需要加锁
	p.lock.Lock()
	workers := p.workers
	n := len(workers) - 1
	fmt.Println("空闲worker数量:",n+1)
	fmt.Println("协程池现在运行的worker数量:",p.running)
	// 当前worker队列为空(无空闲worker)
	if n < 0 {
		//没有空闲的worker有两种可能:
		//1.运行的worker超出了pool容量
		//2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
		// 运行worker数目已达到该Pool的容量上限,置等待标志
		if p.running >= p.capacity {
			//print("超过上限")
			waiting = true
		} else {
			// 当前无空闲worker但是Pool还没有满,
			// 则可以直接新开一个worker执行任务
			p.running++
			w = &Worker{
				pool: p,
				task: make(chan functinType),
				str:make(chan string),
			}
		}
		// 有空闲worker,从队列尾部取出一个使用
	} else {
		//<-p.freeSignal
		w = workers[n]
		workers[n] = nil
		p.workers = workers[:n]
		p.running++
	}
	// 判断是否有worker可用结束,解锁
	p.lock.Unlock()
	if waiting {
		//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
		// 阻塞等待直到有空闲worker
		for len(p.workers) == 0{
			continue
		}
		p.lock.Lock()
		workers = p.workers
		l := len(workers) - 1
		w = workers[l]
		workers[l] = nil
		p.workers = workers[:l]
		p.running++
		p.lock.Unlock()
	}
	return w
}

//定期清理过期Worker
func (p *Pool) monitorAndClear() {
	go func() {
		for {
			// 周期性循环检查过期worker并清理
			time.Sleep(p.expiryDuration)
			currentTime := time.Now()
			p.lock.Lock()
			idleWorkers := p.workers
			n := 0
			for i, w := range idleWorkers {
				// 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
				if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
					break
				}
				n = i
				w.stop()
				idleWorkers[i] = nil
				p.running--
			}
			if n > 0 {
				n++
				p.workers = idleWorkers[n:]
			}
			p.lock.Unlock()
		}
	}()
}

//Worker回收(goroutine复用)
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
	// 写入回收时间,亦即该worker的最后运行时间
	worker.recycleTime = time.Now()
	p.lock.Lock()
	p.running --
	p.workers = append(p.workers, worker)
	p.lock.Unlock()

}

//动态扩容或者缩小池容量
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
	cap := int(p.capacity)
	if size <  cap{
		diff := cap - size
		for i := 0; i < diff; i++ {
			p.getWorker().stop()
		}
	} else if size == cap {
		return
	}
	atomic.StoreInt32(&p.capacity, int32(size))
}

Woker

package Poolpkg

import (
	"sync/atomic"
	"time"
)

type functinType func(string) error


// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
	// pool who owns this worker.
	pool *Pool
	// task is a job should be done.
	task chan functinType
	// recycleTime will be update when putting a worker back into queue.
	recycleTime time.Time

	str chan string
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {

	go func() {
		//监听任务列表,一旦有任务立马取出运行
		count := 1
		var str string
		var f functinType
		for count <=2{
			select {
			case str_temp, ok := <- w.str:
				if !ok {
					return
				}
				count ++
				str = str_temp
			case f_temp, ok := <-w.task:
				if !ok {
					//如果接收到关闭
					atomic.AddInt32(&w.pool.running, -1)
					close(w.task)
					return
				}
				count  ++
				f = f_temp
			}
		}
		err := f(str)
		if err != nil{
			//fmt.Println("执行任务失败")
		}
		//回收复用
		w.pool.putWorker(w)
		return
	}()
}

// stop this worker.
func (w *Worker) stop() {
	w.sendTask(nil)
	close(w.str)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
	w.task <- task
}

func (w *Worker) sendarg(str string) {
	w.str <- str
}

 

 


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
电竞玩家看过来:WVA2018启动,深圳E-GO CLUB开业发布时间:2022-07-10
下一篇:
使用Go和Let's Encrypt证书部署HTTPS发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap