【发布时间】:2021-10-24 15:13:45
【问题描述】:
我发现自己有一个工作队列,工作人员可以在处理完一个工作后添加新工作。
为了说明,在下面的代码中,一个作业包括最多计数 JOB_COUNTING_TO 和随机 1/5 的时间,一个工人将一个新作业添加到队列中。
因为我的工作人员可以将作业添加到队列中,所以据我了解,我无法将通道用作我的作业队列。这是因为发送到通道是阻塞的,即使使用缓冲通道,此代码由于其递归性质(作业添加作业)很容易达到所有工作人员都发送到通道并且没有工作人员可用的情况收到。
这就是我决定使用受互斥体保护的共享队列的原因。
现在,我希望程序在所有工作人员都空闲时停止。不幸的是,这不能仅仅通过寻找 len(jobQueue) == 0 的时间来发现,因为队列可能是空的,但一些工作人员仍在做他们的工作,之后可能会添加一个新工作。
我想出的解决方案是,我觉得有点笨拙,它利用变量var idleWorkerCount int 和var isIdle [NB_WORKERS]bool 来跟踪空闲的worker,当idleWorkerCount == NB_WORKERS 时代码停止。
我的问题是我是否可以使用一种并发模式来使这个逻辑更优雅?
另外,由于某种原因,当工人数量变得相当大(例如 300000 个工人)时,我目前使用的技术(下面的代码)变得非常低效:对于相同数量的工作,代码NB_WORKERS = 300000 比 NB_WORKERS = 3000 慢 10 倍以上。
非常感谢您!
package main
import (
"math/rand"
"sync"
)
const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000
var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool
func doJob(workerId int) {
mu.Lock()
if len(jobQueue) == 0 {
if !isIdle[workerId] {
idleWorkerCount += 1
}
isIdle[workerId] = true
mu.Unlock()
return
}
if isIdle[workerId] {
idleWorkerCount -= 1
}
isIdle[workerId] = false
var job int
job, jobQueue = jobQueue[0], jobQueue[1:]
mu.Unlock()
for i := 0; i < job; i += 1 {
}
if rand.Intn(5) == 0 {
mu.Lock()
jobQueue = append(jobQueue, JOB_COUNTING_TO)
mu.Unlock()
}
}
func main() {
// Filling up the queue with initial jobs
for i := 0; i < NB_INITIAL_JOBS; i += 1 {
jobQueue = append(jobQueue, JOB_COUNTING_TO)
}
var wg sync.WaitGroup
for i := 0; i < NB_WORKERS; i += 1 {
wg.Add(1)
go func(workerId int) {
for idleWorkerCount != NB_WORKERS {
doJob(workerId)
}
wg.Done()
}(i)
}
wg.Wait()
}
【问题讨论】:
-
你能解释一下为什么为每个新作业生成一个新的 goroutine 并使用信号量(如果需要)来控制一次执行的作业数量不是一个好的选择吗?这将允许简单地等待所有 goroutine 完成(此时“队列”隐式为空)。您是否担心内存使用情况?如果是这样,您应该将此作为明确要求。您还应该解释为什么需要这么多工人。工人们具体是做什么的?
-
这就是我决定使用受互斥体保护的共享队列的原因 — 通道是受互斥体保护的队列
-
@blackgreen 我不知道谢谢!正如帖子中所解释的,我遇到的通道问题是,发送到通道是阻塞的,这个算法会产生死锁。
-
@CAFxX 我的第一个实现正是您所建议的:递归调用“go doJob”而不用信号量保护它。一开始效果很好,但是因为要做的工作是某种指数数学过程,所以很快 goroutine 的数量变得令人望而却步(> 500 000)并且程序崩溃了。我试图用缓冲的
chan struct{}保护它,但遇到了我不理解的死锁。 -
@CAFxX:这是一个与我认为你提出的 pastebin.com/mXB76Ft9 相对应的代码,它因为我不明白的原因而死锁
标签: go concurrency