我正在尝试在 Go 中构建一个网络爬虫,我想在其中指定并发工作人员的最大数量。只要队列中有可供探索的链接,它们都会工作。当队列的元素少于工作人员时,工作人员应该大声喊叫,但如果找到更多链接,则继续。
我试过的代码是
const max_workers = 6
// simulating links with int
func crawl(wg *sync.WaitGroup, queue chan int) {
for element := range queue {
wg.Done() // why is defer here causing a deadlock?
fmt.Println("adding 2 new elements ")
if element%2 == 0 {
wg.Add(2)
queue <- (element*100 + 11)
queue <- (element*100 + 33)
}
}
}
func main() {
var wg sync.WaitGroup
queue := make(chan int, 10)
queue <- 0
queue <- 1
queue <- 2
queue <- 3
var min int
if (len(queue) < max_workers) {
min = len(queue)
} else {
min = max_workers
}
for i := 0; i < min; i++ {
wg.Add(1)
go crawl(&wg, queue)
}
wg.Wait()
close(queue)
}
Link to playground
这似乎有效,但有一个问题:当我开始时,我必须用多个元素填充队列。我希望它从(单个)种子页面(在我的示例中 queue <- 0 )开始,然后动态地增长/缩小工作池。
我的问题是: 我怎样才能获得行为? 为什么延迟 wg.Done() 会导致死锁? wg.Done() 函数实际完成后是否正常?我认为如果没有 defer ,goroutine 不会等待另一部分完成(在解析 HTML 的实际工作示例中,这可能需要更长的时间)。
Best Answer-推荐答案 strong>
如果您使用自己喜欢的网络搜索“Go web crawler”(或“golang web crawler”) 你会发现很多例子,包括: Go Tour Exercise: Web Crawler 。 在 Go 中也有一些关于并发的讨论涵盖了这种事情。
在 Go 中执行此操作的“标准”方法根本不需要涉及 WaitGroup 。 为了回答您的一个问题,使用 defer 排队的事情只有在函数返回时才会运行。您有一个长时间运行的函数,所以不要在这样的循环中使用 defer 。
“标准”方法是在他们自己的 goroutine 中启动你想要的任意数量的 worker 。 他们都从同一个 channel 读取“作业”,在无事可做时阻塞。 完成后,该 channel 将关闭并且它们都退出。
在像爬虫这样的情况下,工作人员会发现更多的“工作”要做,并希望将它们排入队列。 你不希望他们写回同一个 channel ,因为它会有一些有限的缓冲量(或没有!),你最终会阻止所有试图排队更多工作的 worker !
一个简单的解决方案是使用单独的 channel (例如,每个 worker 都有 in <-chan Job, out chan<- Job ) 以及读取这些请求的单个队列/过滤器 goroutine, 将它们附加到一个 slice 上,它可以任意增大或进行一些全局限制, 并且还从 slice 的头部馈送另一个 channel (即从一个 channel 读取并写入另一个 channel 的简单 for-select 循环)。 此代码通常还负责跟踪已完成的工作 (例如,访问过的 URL 的映射)并丢弃传入的重复请求。
队列 goroutine 可能看起来像这样(这里的参数名称过于冗长):
type Job string
func queue(toWorkers chan<- Job, fromWorkers <-chan Job) {
var list []Job
done := make(map[Job]bool)
for {
var send chan<- Job
var item Job
if len(list) > 0 {
send = toWorkers
item = list[0]
}
select {
case send <- item:
// We sent an item, remove it
list = list[1:]
case thing := <-fromWorkers:
// Got a new thing
if !done[thing] {
list = append(list, thing)
done[thing] = true
}
}
}
}
在这个简单的例子中,一些事情被掩盖了。 比如终止。如果“Jobs”是一些更大的结构,您想要使用 chan *Job 和 []*Job 代替。 在这种情况下,您还需要将 map 类型更改为您从作业中提取的某些键 (例如 Job.URL 也许) 并且您想在 list[0] = nil 之前执行 list = list[1:] 以摆脱对 *Job 指针的引用,并让垃圾收集器更早地处理它。
编辑:关于干净地终止的一些说明。
有几种方法可以干净地终止上述代码。可以使用 WaitGroup ,但是需要小心地放置 Add/Done 调用,并且您可能需要另一个 goroutine 来执行 Wait(然后关闭一个 channel 以开始关闭)。工作人员不应该关闭他们的输出 channel ,因为有多个工作人员并且您不能多次关闭 channel ;队列 goroutine 在不知道工作人员何时完成的情况下无法告诉何时关闭它对工作人员的 channel 。
过去,当我使用与上述非常相似的代码时,我在“队列”goroutine 中使用了本地“未完成”计数器(这避免了对互斥锁或 WaitGroup 具有的任何同步开销的任何需要)。将作业发送给工作人员时,未完成作业的计数会增加。当 worker 说它已经完成时,它又减少了。我的代码碰巧有另一个 channel (除了要排队的更多节点之外,我的“队列”还收集结果)。它在自己的 channel 上可能更干净,但可以使用现有 channel 上的特殊值(例如 nil 作业指针)。无论如何,有了这样一个计数器,本地列表上现有的长度检查只需要看到当列表为空并且是时候终止时没有任何未完成的事情;只需关闭通向 worker 的 channel 并返回。
例如。:
if len(list) > 0 {
send = toWorkers
item = list[0]
} else if outstandingJobs == 0 {
close(toWorkers)
return
}
关于go - Go 中的网络爬虫,我们在Stack Overflow上找到一个类似的问题:
https://stackoverflow.com/questions/29491795/
|