• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Go控制协裎并发数量的用法及实际中的一个案例 使用waitgroup在循环中开Gorou ...

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

实际中并发的问题

  之前有一篇文章介绍了在Go中使用waitGroup开启Goroutine并发处理任务的小demo:使用waitgroup在循环中开Goroutine处理并发任务

  上面的这种写法其实有一个bug,比如说实际中我有上万个任务需要执行,使用上面的写法会在默认情况下在一个线程中开尽可能多的协裎去处理这一万个任务。但是如果任务请求的服务有RateLimit(限流)的话就发生问题了:同时开上万个任务去并发请求数据,绝大多数情况下都会触发服务端的RateLimit的限制,这样会导致很多任务失败以至于影响整个大任务的结果(甚至还不如同步执行,虽然效率低至少结果准确)!

解决方案

  为了避免默认情况下开启太多的Goroutine引发超出RateLimit以至于影响整个任务的结果的情况,我们可以限制主线程中同时执行的Goroutine的数量来避免请求超出RateLimit。

代码写法

  简单描述一下需求:现在有几十个广告账号,每个广告账号下又有上万条广告,要求并发获取每个广告账户下的所有广告的insights信息。

  代码的写法很简单,需要大家多理解一下两个常量的作用:

const (
    WORKER_COUNT   = 30 // 同时执行任务的协裎的数量
    PER_WORKER_MIN = 25 // 每个协裎最少执行的任务数量
)


accountLst, _ := app.GetAllAccounts()
// 遍历账户列表 批量去处理账户下的Ad的数据
for _, accObj := range accountLst {
    // 控制协裎数量的写法
    // 初始化一个 WaitGroup对象,等协裎执行完后再走主线程
    wait := sync.WaitGroup{}
    // 获取某个账户下的所有ad
    adList, _ := app.GetAllAccountAds()// 并发处理每个ad的数据
    totalCount := len(adList)
    // 每个协裎处理的任务数
    countPerWorker := (totalCount / WORKER_COUNT) + 1 
    // 做一下保护,如果有协裎处理的任务数少于规定的了,就给他添加一下
    if countPerWorker < PER_WORKER_MIN {
        countPerWorker = PER_WORKER_MIN
    }
    currentIndex := 0
    for a := 0; a < WORKER_COUNT; a++ { 
        // 做一下保护,超出了总任务数不执行下面的逻辑了
        if currentIndex >= totalCount {
            continue
        }
        wait.Add(1) // tag+1
        // 这里开始开启协裎。。。。。
        go func(startIndex, endIndex int) {
            defer wait.Done() // tag-1
            // mlog.Info(fmt.Sprintf("Start Work %d, %d\n", startIndex, endIndex))
            for i := startIndex; i < endIndex; i++ {
                if i >= totalCount {
                    break
                }
                adObj := adList[i]
                adId := adObj.Id
                //mlog.Debug(fmt.Sprintf("account %s 的第 %d 个Ad!\n", accountId, i))
                // 执行逻辑代码。。。。。。
                if err := task.handleAdInsights(RefreshToken, adId, app); err != nil {
                    mlog.Error(err.Error())
                    continue
                }
                //if i%50 == 0 {
                //    mlog.Info(fmt.Sprintf("Finish Work %d, %d, current %d\n", startIndex, endIndex, i))
                //}
            }
            //mlog.Info(fmt.Sprintf("Finish Work %d, %d\n", startIndex, endIndex))
        }(currentIndex, currentIndex+countPerWorker)
        currentIndex = currentIndex + countPerWorker
    }
    // 协裎结束后走主线程
    wait.Wait()
}

~~~


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
go-hystrix熔断机制发布时间:2022-07-10
下一篇:
GO学习笔记之map发布时间:2022-07-10
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap