【问题标题】:Parallel For-Loop并行 For 循环
【发布时间】:2014-08-05 23:46:30
【问题描述】:

我希望 for 循环使用 go 例程并行。我尝试使用频道,但没有奏效。我的主要问题是,我想在继续之前等待所有迭代完成。这就是为什么在它不起作用之前简单地写go。我尝试使用通道(我认为是错误的方式),但这让我的代码更慢

func createPopulation(populationSize int, individualSize int) []Individual {
    population := make([]Individual, populationSize)

    //i want this loop to be work parallel
    for i := 0; i < len(population); i++ {
        population[i] = createIndividual(individualSize)
    }

    return population
}

func createIndividual(size int) Individual {
    var individual = Individual{make([]bool, size), 0}

    for i := 0; i < len(individual.gene); i++ {
        if rand.Intn(2)%2 == 1 {
            individual.gene[i] = true
        } else {
            individual.gene[i] = false
        }
    }

    return individual
}

我的结构如下所示:

type Individual struct {
    gene []bool
    fitness int
}

【问题讨论】:

  • 您在尝试不同版本时设置了环境变量 GOMAXPROCS 吗?

标签: for-loop concurrency go


【解决方案1】:

所以基本上 goroutine 不应该返回一个值,而是将它推下一个通道。如果你想等待所有的 goroutines 完成,你可以计算 goroutines 的数量,或者使用 WaitGroup。在这个例子中,这是一个矫枉过正的做法,因为大小是已知的,但无论如何这是一个很好的做法。这是一个修改过的例子:

package main

import (
    "math/rand"
    "sync"
)

type Individual struct {
    gene    []bool
    fitness int
}


func createPopulation(populationSize int, individualSize int) []Individual  {

    // we create a slice with a capacity of populationSize but 0 size
    // so we'll avoid extra unneeded allocations
    population := make([]Individual, 0, populationSize)

    // we create a buffered channel so writing to it won't block while we wait for the waitgroup to finish
    ch := make(chan Individual, populationSize)

    // we create a waitgroup - basically block until N tasks say they are done
    wg := sync.WaitGroup{}

    for i := 0; i < populationSize; i++ {

        //we add 1 to the wait group - each worker will decrease it back
        wg.Add(1)

        //now we spawn a goroutine
        go createIndividual(individualSize, ch, &wg)
    }

    // now we wait for everyone to finish - again, not a must.
    // you can just receive from the channel N times, and use a timeout or something for safety
    wg.Wait()

    // we need to close the channel or the following loop will get stuck
    close(ch)

    // we iterate over the closed channel and receive all data from it
    for individual := range ch {

        population = append(population, individual)
    }
    return population

}   

func createIndividual(size int, ch chan Individual, wg *sync.WaitGroup) {

    var individual = Individual{make([]bool, size), 0}

    for i := 0; i < len(individual.gene); i++ {
        if rand.Intn(2)%2 == 1 {
            individual.gene[i] = true
        } else {
            individual.gene[i] = false
        }
    }

    // push the population object down the channel
    ch <- individual
    // let the wait group know we finished
    wg.Done()

}

【讨论】:

  • 很好的答案,但我觉得在这个例子中population := make([]Individual, populationSize) 可能应该是population := make([]Individual, 0),否则append 语句会将新的Individuals放在一个空的长度片段的末尾@ 987654326@。 play.golang.org/p/6eYlk40Oal
  • @Intermernet 你是对的,我错过了。我会修正我的答案。
  • 固定分配具有已知容量但只有0个成员的切片。
  • 这似乎可行,但它在四核上并不快(事实上:它更慢)。你能告诉我为什么吗?
  • @PhilippSander 是的。 1.您需要调用runtime.GOMAXPROC()来获得所需的CPU数量。 2. 通道和生成 goroutine 的开销很大。除非每个 goroutine 的任务长时间运行,否则您不会看到任何收益。
【解决方案2】:

对于您的具体问题,您根本不需要使用渠道。

但是,除非您的 createIndividual 花费一些时间进行计算,否则当并行运行时,goroutine 之间的上下文切换总是会慢得多。

type Individual struct {
    gene    []bool
    fitness int
}

func createPopulation(populationSize int, individualSize int) (population []*Individual) {
    var wg sync.WaitGroup
    population = make([]*Individual, populationSize)

    wg.Add(populationSize)
    for i := 0; i < populationSize; i++ {
        go func(i int) {
            population[i] = createIndividual(individualSize)
            wg.Done()
        }(i)
    }
    wg.Wait()
    return
}

func createIndividual(size int) *Individual {
    individual := &Individual{make([]bool, size), 0}

    for i := 0; i < size; i++ {
        individual.gene[i] = rand.Intn(2)%2 == 1
    }

    return individual
}

func main() {
    numcpu := flag.Int("cpu", runtime.NumCPU(), "")
    flag.Parse()
    runtime.GOMAXPROCS(*numcpu)
    pop := createPopulation(1e2, 21e3)
    fmt.Println(len(pop))
}

输出:

┌─ oneofone@Oa [/tmp]                                                                                                            
└──➜ go build blah.go; xtime ./blah -cpu 1
100
0.13u 0.00s 0.13r 4556kB ./blah -cpu 1
┌─ oneofone@Oa [/tmp]                                                                                                            
└──➜ go build blah.go; xtime ./blah -cpu 4
100
2.10u 0.12s 0.60r 4724kB ./blah -cpu 4

【讨论】:

    【解决方案3】:

    向这样的循环添加受控并行性的一种常见方法是生成许多将从通道读取任务的工作 goroutines。 runtime.NumCPU 函数可能有助于确定产生多少工人是有意义的(确保您适当地设置 GOMAXPROCS 以利用这些 CPU)。然后,您只需将作业写入通道,它们将由工作人员处理。

    在这种情况下,工作是初始化人口切片的元素,因此使用*Individual 指针的通道可能是有意义的。像这样的:

    ch := make(chan *Individual)
    for i := 0; i < nworkers; i++ {
        go initIndividuals(individualSize, ch)
    }
    
    population := make([]Individual, populationSize)
    for i := 0; i < len(population); i++ {
        ch <- &population[i]
    }
    close(ch)
    

    worker goroutine 看起来像这样:

    func initIndividuals(size int, ch <-chan *Individual) {
        for individual := range ch {
            // Or alternatively inline the createIndividual() code here if it is the only call
            *individual = createIndividual(size)
        }
    }
    

    由于任务没有提前分配,createIndividual 花费可变的时间并不重要:每个工人只会在最后一个完成时承担新任务,并在有时退出没有剩余任务(因为此时通道已关闭)。

    但是我们如何知道工作何时完成? sync.WaitGroup 类型可以在这里提供帮助。生成工作 goroutine 的代码可以这样修改:

    ch := make(chan *Individual)
    var wg sync.WaitGroup
    wg.Add(nworkers)
    for i := 0; i < nworkers; i++ {
        go initIndividuals(individualSize, ch, &wg)
    }
    

    initIndividuals函数也被修改为带附加参数,并添加defer wg.Done()作为第一条语句。现在对 wg.Wait() 的调用将阻塞,直到所有工作 goroutine 都完成。然后,您可以返回完全构造的 population 切片。

    【讨论】:

    • for i := 9; i &lt; nworkers; i++ { 为什么用 9 初始化 i?什么是 nworkers?
    • nworkers 是你想要生成的工作 goroutine 的数量。每个工作人员将处理多个任务,因此它应该是GOMAXPROCS(以获得最大并行度)而不是任务总数。
    • for i := 0; i &lt; nworkers; i++ { 那么这没有任何意义
    • 哪一部分对您来说没有意义?这是一个循环,可以生成许多工作 goroutine 以达到所需的并行度。
    【解决方案4】:

    如果您想避免将并发逻辑与业务逻辑混为一谈,我编写了这个库https://github.com/shomali11/parallelizer 来帮助您。它封装了并发逻辑,所以你不用担心。

    所以在你的例子中:

    package main
    
    import (
        "github.com/shomali11/parallelizer"
        "fmt"
    )
    
    func main() {
        populationSize := 100
        results = make([]*Individual, populationSize)
    
        options := &Options{ Timeout: time.Second }
        group := parallelizer.NewGroup(options)
        for i := 0; i < populationSize; i++ {
            group.Add(func(index int, results *[]*Individual) {
                return func () {
                    ...
    
                    results[index] = &Individual{...}
                }
            }(i, &results))
        }
    
        err := group.Run()
    
        fmt.Println("Done")
        fmt.Println(fmt.Sprintf("Results: %v", results))
        fmt.Printf("Error: %v", err) // nil if it completed, err if timed out
    }
    

    【讨论】:

      【解决方案5】:

      由于您事先知道您将拥有多少个人,我会避免使用频道,而只是在 goroutine createIndividual 中分配 population 的个人成员。 createIndividual 的签名将如下所示:

      func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) 
      

      调用代码如下所示:

      population := make([]Individual, populationSize)
      wg := &sync.WaitGroup{}
      wg.Add(len(population))
      
      for i := 0; i < len(population); i++ {
          go createIndividual(wg, &population[i], individualSize)
      }
      
      wg.Wait()
      

      所以,每个 goroutine 只负责一个人,它分配给population 中的相应槽:

      func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) {
          defer wg.Done()
          *individual = Individual{make([]bool, size), 0}
      
          // assign other attributes to `individual`
      }
      

      您可以查看完整的代码示例on play here

      【讨论】:

      • 这肯定行得通,但有人可能会争辩说它违背了 Go 的“通过通信共享内存”的习语。
      • @Not_a_Golfer 是的,但是由于这似乎是一个非常小的和孤立的问题,我认为为了性能和可读性这样做是可以的。但这当然取决于createIndividual 最终会做什么。最终可能会证明通道解决方案实际上更快。
      • 我什至不确定,因为所有这些东西都是 100% CPU 并且非常快(每个 goroutine),并行执行将比在单个循环中执行要快。取决于我猜的“个人大小”。没有频道会对性能产生很大的影响,但你知道,这就是像 Heartbleed 这样的东西诞生的原因:)
      • 哦,拜托,这个解决方案实际上比通道解决方案更具可读性和可理解性(更少的代码,更少的抽象),所以我可以说在这种情况下使用通道是什么使“像 Heartbleed 这样的事情”发生。这个论点是错误的。
      • 唯一的问题是您正在传递指向预分配数组中位置的指针。当代码变得更复杂时,竞争条件和踩到指针的机会就会增加。通道提供的隔离可以防止这种情况。就是这样。
      猜你喜欢
      • 2017-04-18
      • 2016-08-10
      • 2013-12-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多