【问题标题】:Job queue where workers can add jobs, is there an elegant solution to stop the program when all workers are idle?工作人员可以添加工作的工作队列,是否有一个优雅的解决方案可以在所有工作人员空闲时停止程序?
【发布时间】:2021-10-24 15:13:45
【问题描述】:

我发现自己有一个工作队列,工作人员可以在处理完一个工作后添加新工作。

为了说明,在下面的代码中,一个作业包括最多计数 JOB_COUNTING_TO 和随机 1/5 的时间,一个工人将一个新作业添加到队列中。

因为我的工作人员可以将作业添加到队列中,所以据我了解,我无法将通道用作我的作业队列。这是因为发送到通道是阻塞的,即使使用缓冲通道,此代码由于其递归性质(作业添加作业)很容易达到所有工作人员都发送到通道并且没有工作人员可用的情况收到。

这就是我决定使用受互斥体保护的共享队列的原因。

现在,我希望程序在所有工作人员都空闲时停止。不幸的是,这不能仅仅通过寻找 len(jobQueue) == 0 的时间来发现,因为队列可能是空的,但一些工作人员仍在做他们的工作,之后可能会添加一个新工作。

我想出的解决方案是,我觉得有点笨拙,它利用变量var idleWorkerCount intvar isIdle [NB_WORKERS]bool 来跟踪空闲的worker,当idleWorkerCount == NB_WORKERS 时代码停止。

我的问题是我是否可以使用一种并发模式来使这个逻辑更优雅?

另外,由于某种原因,当工人数量变得相当大(例如 300000 个工人)时,我目前使用的技术(下面的代码)变得非常低效:对于相同数量的工作,代码NB_WORKERS = 300000NB_WORKERS = 3000 慢 10 倍以上。

非常感谢您!

package main

import (
    "math/rand"
    "sync"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool

func doJob(workerId int) {

    mu.Lock()

    if len(jobQueue) == 0 {
        if !isIdle[workerId] {
            idleWorkerCount += 1
        }
        isIdle[workerId] = true
        mu.Unlock()
        return
    }

    if isIdle[workerId] {
        idleWorkerCount -= 1
    }
    isIdle[workerId] = false

    var job int
    job, jobQueue = jobQueue[0], jobQueue[1:]
    mu.Unlock()

    for i := 0; i < job; i += 1 {
    }

    if rand.Intn(5) == 0 {
        mu.Lock()
        jobQueue = append(jobQueue, JOB_COUNTING_TO)
        mu.Unlock()
    }

}

func main() {

    // Filling up the queue with initial jobs
    for i := 0; i < NB_INITIAL_JOBS; i += 1 {
        jobQueue = append(jobQueue, JOB_COUNTING_TO)
    }

    var wg sync.WaitGroup
    for i := 0; i < NB_WORKERS; i += 1 {
        wg.Add(1)
        go func(workerId int) {
            for idleWorkerCount != NB_WORKERS {
                doJob(workerId)
            }
            wg.Done()
        }(i)
    }
    wg.Wait()
}

【问题讨论】:

  • 你能解释一下为什么为每个新作业生成一个新的 goroutine 并使用信号量(如果需要)来控制一次执行的作业数量不是一个好的选择吗?这将允许简单地等待所有 goroutine 完成(此时“队列”隐式为空)。您是否担心内存使用情况?如果是这样,您应该将此作为明确要求。您还应该解释为什么需要这么多工人。工人们具体是做什么的?
  • 这就是我决定使用受互斥体保护的共享队列的原因 — 通道受互斥体保护的队列
  • @blackgreen 我不知道谢谢!正如帖子中所解释的,我遇到的通道问题是,发送到通道是阻塞的,这个算法会产生死锁。
  • @CAFxX 我的第一个实现正是您所建议的:递归调用“go doJob”而不用信号量保护它。一开始效果很好,但是因为要做的工作是某种指数数学过程,所以很快 goroutine 的数量变得令人望而却步(> 500 000)并且程序崩溃了。我试图用缓冲的chan struct{} 保护它,但遇到了我不理解的死锁。
  • @CAFxX:这是一个与我认为你提出的 pastebin.com/mXB76Ft9 相对应的代码,它因为我不明白的原因而死锁

标签: go concurrency


【解决方案1】:

因为我的工人可以将作业添加到队列中

重入通道总是死锁。使用此代码很容易演示

package main

import (
    "fmt"
)

func main() {

    out := make(chan string)
    c := make(chan string)
    go func() {
        for v := range c {
            c <- v + " 2"
            out <- v
        }
    }()

    go func() {
        c <- "hello world!" // pass OK
        c <- "hello world!" // no pass, the routine is blocking at pushing to itself
    }()

    for v := range out {
        fmt.Println(v)
    }

}

在节目中

  • 尝试推送c &lt;- v + " 2"

不可能

  • 阅读for v := range c {
  • 推送c &lt;- "hello world!"
  • 阅读for v := range out {

因此,它会死锁。

如果你想通过那种情况,你必须在某个地方溢出。

在例行公事上,或其他地方。

package main

import (
    "fmt"
    "time"
)

func main() {

    out := make(chan string)
    c := make(chan string)
    go func() {
        for v := range c {
            go func() { // use routines on the stack as a bank for the required overflow.
                <-time.After(time.Second) // simulate slowliness.
                c <- v + " 2"
            }()
            out <- v
        }
    }()

    go func() {
        for {
            c <- "hello world!"
        }
    }()

    exit := time.After(time.Second * 60)
    for v := range out {
        fmt.Println(v)
        select {
        case <-exit:
            return
        default:
        }
    }
}

但现在你有一个新问题。

您通过在堆栈上无限制地溢出来制造内存炸弹。从技术上讲,这取决于完成工作所需的时间、可用内存、CPU 的速度和数据的形状(它们可能会或可能不会生成新工作)。所以有一个上限,但很难理解,实际上这最终会成为炸弹。

考虑在没有限制的堆栈上不溢出。

如果您手头没有任何任意限制,您可以使用信号量来限制溢出。

https://play.golang.org/p/5JWPQiqOYKz

我的炸弹在工作超时 1s 和 2s 时没有爆炸,但它们占用了大量内存。

在修改代码的另一轮中,它爆炸了

当然,因为您在代码中使用了if rand.Intn(5) == 0 {,所以问题在很大程度上得到了缓解。不过,当您遇到这种模式时,请三思而后行。

另外,由于某种原因,当工人数量变得相当大(例如 300000 个工人)时,我目前使用的技术(下面的代码)变得非常低效:对于相同数量的工作,代码NB_WORKERS = 300000 与 NB_WORKERS = 3000 相比,将慢 10 倍以上。

总体而言,您的 CPU 周期数是有限的。所有这些分配和指令,产生和同步,也必须执行。并发不是免费的。

现在,我希望程序在所有工作人员都空闲时停止。

我想出了这个想法,但我发现很难推理和说服自己它不会以write on closed channel 恐慌告终。

这个想法是使用sync.WaitGroup 来计算飞行项目并依靠它来正确关闭输入通道并完成工作。

package main

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

func main() {

    rand.Seed(time.Now().UnixNano())

    var wg sync.WaitGroup
    var wgr sync.WaitGroup
    out := make(chan string)
    c := make(chan string)
    go func() {
        for v := range c {
            if rand.Intn(5) == 0 {
                wgr.Add(1)
                go func(v string) {
                    <-time.After(time.Microsecond)
                    c <- v + " 2"
                }(v)
            }
            wgr.Done()
            out <- v
        }
        close(out)
    }()

    var sent int
    wg.Add(1)
    go func() {
        for i := 0; i < 300; i++ {
            wgr.Add(1)
            c <- "hello world!"
            sent++
        }
        wg.Done()
    }()

    go func() {
        wg.Wait()
        wgr.Wait()
        close(c)
    }()

    var rcv int
    for v := range out {
        // fmt.Println(v)
        _ = v
        rcv++
    }
    log.Println("sent", sent)
    log.Println("rcv", rcv)
}

我用while go run -race .; do :; done 运行它,它在合理的迭代次数下运行良好。

【讨论】:

  • 抱歉回复晚了!非常感谢您非常透彻的分析。我设法通过每个 go-routine 批处理超过 1 个作业来解决我的问题。谢谢!
猜你喜欢
  • 1970-01-01
  • 2015-07-04
  • 2015-08-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多