【问题标题】:Wait for the termination of n goroutines等待 n 个 goroutine 终止
【发布时间】:2023-03-10 06:52:01
【问题描述】:

我需要启动大量的 goroutine 并等待它们终止。直观的方式似乎是使用一个通道来等待它们全部完成:

package main

type Object struct {
    //data
}

func (obj *Object) Update(channel chan int) {
    //update data
    channel <- 1
    return
}

func main() {

    channel := make(chan int, n)
    list := make([]Object, n, m)
    for {
        for _, object := range list {
            go object.Update(channel)
        }
        for i := 0; i < n; i++ {
            <-channel
        }
        //now everything has been updated. start again
    }
}

但问题在于对象的数量以及 goroutine 的数量可能会发生变化。是否可以更改通道的缓冲区大小?

是否有更优雅的方式来做到这一点?

【问题讨论】:

标签: concurrency go channel coroutine goroutine


【解决方案1】:

我使用WaitGroup 来解决这个问题。翻译你当前的代码,用一些日志来说明发生了什么:

package main

import "sync"
import "fmt"
import "time"

type Object struct {
    //data
}

func (obj *Object) Update(wg *sync.WaitGroup) {
    //update data
    time.Sleep(time.Second)
    fmt.Println("Update done")
    wg.Done()
    return
}

func main() {
    var wg sync.WaitGroup
    list := make([]Object, 5)
    for {
        for _, object := range list {
            wg.Add(1)
            go object.Update(&wg)
        }
        //now everything has been updated. start again
        wg.Wait()
        fmt.Println("Group done")
    }
}

【讨论】:

  • 不错的答案!我可能会将defer wg.Done() 放在Update 的开头,但以防万一函数在未来某个时间增长并获得早期回报。
  • 或者万一出现恐慌什么的。
【解决方案2】:

这个任务并不简单,写一个有问题的任务很容易。我建议在标准库中使用现成的解决方案 - sync.WaitGroup。引用链接:

WaitGroup 等待一组 goroutine 完成。主 goroutine 调用 Add 来设置要等待的 goroutine 的数量。然后每个 goroutine 运行并在完成时调用 Done。同时,Wait 可以用来阻塞,直到所有的 goroutine 都完成。

【讨论】:

  • 如果事先不知道要等待的 goroutine 的数量?
  • @Dfr 会在启动每个 goroutine 时递增计数器,因此当您不知道要启动的 goroutine 数量时,此解决方案仍然是最佳解决方案。
【解决方案3】:

@tjameson 很好地解释了如何使用WaitGroup,如何将WaitGroup 对象的引用传递给您的函数。我将对他的示例进行的一项更改是利用defer 当您是Done 时。我认为这个defer ws.Done() 应该是你函数中的第一条语句。

我喜欢WaitGroup 的简洁。但是,我不喜欢我们需要将引用传递给 goroutine,因为这意味着并发逻辑将与您的业务逻辑混合。

所以我想出了这个通用函数来为我解决这个问题:

// Parallelize parallelizes the function calls
func Parallelize(functions ...func()) {
    var waitGroup sync.WaitGroup
    waitGroup.Add(len(functions))

    defer waitGroup.Wait()

    for _, function := range functions {
        go func(copy func()) {
            defer waitGroup.Done()
            copy()
        }(function)
    }
}

所以你的例子可以这样解决:

type Object struct {
    //data
}

func (obj *Object) Update() {
    //update data
    time.Sleep(time.Second)
    fmt.Println("Update done")
    return
}

func main() {
    functions := []func(){}
    list := make([]Object, 5)
    for _, object := range list {
        function := func(obj Object){ object.Update() }(object)
        functions = append(functions, function)
    }

    Parallelize(functions...)        

    fmt.Println("Group done")
}

如果你想使用它,你可以在这里找到它https://github.com/shomali11/util

【讨论】:

    猜你喜欢
    • 2014-03-17
    • 1970-01-01
    • 2018-04-05
    • 1970-01-01
    • 2020-04-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-24
    相关资源
    最近更新 更多