【问题标题】:go routine wait for response from channel and continuegoroutine 等待频道的响应并继续
【发布时间】:2020-08-23 11:26:19
【问题描述】:

我正在学习并发,我想实现一个简单的示例,该示例从矩阵中获取行并将值的数组(切片)添加到每一行。

由于我使用的是通道,因此我尝试等待每一行从 goroutine 获得相应的结果。但是,这并不比仅同步执行此操作更好。如何让每一行等待各自的结果,并允许其他行同时计算它们的结果?

https://play.golang.org/p/uCOGwOBeIQL


package main

import "fmt"


/*
Array:
0 1 2 3 4 5 6 7 8 9

+

Matrix:
1 0 0 0 0 0 0 0 0 0
0 1 0 0 0 0 0 0 0 0
0 0 1 0 0 0 0 0 0 0
0 0 0 1 0 0 0 0 0 0
0 0 0 0 1 0 0 0 0 0
0 0 0 0 0 1 0 0 0 0
0 0 0 0 0 0 1 0 0 0
0 0 0 0 0 0 0 1 0 0
0 0 0 0 0 0 0 0 1 0
0 0 0 0 0 0 0 0 0 1

-> 
Expected result:
1 1 2 3 4 5 6 7 8 9
0 2 2 3 4 5 6 7 8 9
0 1 3 3 4 5 6 7 8 9
0 1 2 4 4 5 6 7 8 9
0 1 2 3 5 5 6 7 8 9
0 1 2 3 4 6 6 7 8 9
0 1 2 3 4 5 7 7 8 9
0 1 2 3 4 5 6 8 8 9
0 1 2 3 4 5 6 7 9 9
0 1 2 3 4 5 6 7 8 10
*/
func main() {
    numbers := []int {0,1,2,3,4,5,6,7,8,9}

    matrix := [][]int{
        {1,0,0,0,0,0,0,0,0,0},
        {0,1,0,0,0,0,0,0,0,0},
        {0,0,1,0,0,0,0,0,0,0},
        {0,0,0,1,0,0,0,0,0,0},
        {0,0,0,0,1,0,0,0,0,0},
        {0,0,0,0,0,1,0,0,0,0},
        {0,0,0,0,0,0,1,0,0,0},
        {0,0,0,0,0,0,0,1,0,0},
        {0,0,0,0,0,0,0,0,1,0},
        {0,0,0,0,0,0,0,0,0,1},
    }

    rmatrix := make([][]int, 10)

    for i, row := range matrix {
        cResult := make(chan []int)
        go func(row []int, numbers []int, c chan <- []int) {
            c <- addRow(row,numbers)
        }(row,numbers,cResult)

        //this read from the channel will block until the goroutine sends its result over the channel
        rmatrix[i] = <- cResult
    }
    fmt.Println(rmatrix)
}

func addRow(row []int, numbers []int) []int{
    result := make([]int, len(row))
    for i,e := range row {
        result[i] = e + numbers[i];
    }
    return result
}

【问题讨论】:

    标签: go concurrency goroutine


    【解决方案1】:

    这个例子产生了较少数量的 goroutine,并且也保证了正确的顺序,不管哪个 goroutine 先完成了它的处理。

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    type rowRes struct {
        index  int
        result *[]int
    }
    
    func addRow(index int, row []int, numbers []int) rowRes {
        result := make([]int, len(row))
        for i, e := range row {
            result[i] = e + numbers[i]
        }
        return rowRes{
            index:  index,
            result: &result,
        }
    }
    
    func main() {
        numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
    
        matrix := [][]int{
            {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
            {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
            {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
            {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
            {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
            {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
            {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
            {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
            {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
            {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
        }
        rmatrix := make([][]int, 10)
    
        // Buffered channel
        rowChan := make(chan rowRes, 10)
    
        wg := sync.WaitGroup{}
    
        // Reciever goroutine
        go recv(rowChan, rmatrix)
    
        for i := range matrix {
            wg.Add(1)
            go func(index int, row []int, w *sync.WaitGroup) {
                rowChan <- addRow(index, row, numbers)
                w.Done()
            }(i, matrix[i], &wg)
        }
        wg.Wait()
        close(rowChan)
        fmt.Println(rmatrix)
    }
    
    func recv(res chan rowRes, rmatrix [][]int) {
        for {
            select {
            case k, ok := <-res:
                if !ok {
                    return
                }
                rmatrix[k.index] = *k.result
            }
        }
    }
    

    【讨论】:

      【解决方案2】:

      我需要使用sync.WaitGroup 并直接分配调用结果(以确保它们返回到索引行)。谢谢@彼得

      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func main() {
          numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
      
          matrix := [][]int{
              {1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
              {0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
              {0, 0, 1, 0, 0, 0, 0, 0, 0, 0},
              {0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
              {0, 0, 0, 0, 1, 0, 0, 0, 0, 0},
              {0, 0, 0, 0, 0, 1, 0, 0, 0, 0},
              {0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
              {0, 0, 0, 0, 0, 0, 0, 1, 0, 0},
              {0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
              {0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
          }
      
          rmatrix := make([][]int, 10)
          var waitGroup sync.WaitGroup
      
          for i, row := range matrix {
              waitGroup.Add(1)
              go func(i int, row []int) {
                  rmatrix[i] = addRow(row, numbers)
                  waitGroup.Done()
              }(i, row)
          }
          waitGroup.Wait()
          fmt.Println(rmatrix)
      }
      
      func addRow(row []int, numbers []int) []int {
          result := make([]int, len(row))
          for i, e := range row {
              result[i] = e + numbers[i]
          }
          return result
      }
      
      

      【讨论】:

      【解决方案3】:

      管道方法

      
      taskChannel := make(chan string,1000); // Set up the task queue
      wg := sync.WaitGroup
      
      // Task release
      wg.add(1)
      go func(&wg,taskChannel) {
            defer wg.Down()
            for i in "task list" {
              taskChannel <- "Stuff the characters you want to deal with here"
            }
      
          // After the task is sent and closed
          close(taskChannel)
      }(wg *sync.WaitGroup,taskChannel chan string)
      
      // Task execution
      go func(&wg,taskChannel,1000) {
          defer wg.Down()
          limit := make(chan bool,limitNumber); // Limit the number of concurrent
          tg := sync.WaitGroup
          loop:
          for {
            select {
            case task,over := <-taskChannel:
                  if !over {  // If there are no more tasks, quit
                      tg.Wait()  // Wait for all tasks to end
                      break loop
                  }
      
                  tg.Add(1)
                  limit<-true
                  go func(&tg,limitm) {
                      defer func() {
                          <-limit
                          tg.Down()
                      }
                      // Business processing logic, processing tasks
                  }(tg *sync.WaitGroup,limit chan bool,task string)
            }
          }
      }(wg *sync.WaitGroup,taskChannel chan string,limitNumber int)
      
      wg.Wait()
      

      希望能帮到你

      【讨论】:

      • 此代码与问题无关,不保留输入的顺序,对单个案例使用愚蠢的选择,使用 bool 而不是惯用的空结构,并且不在第一名。
      猜你喜欢
      • 2018-08-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-17
      • 2020-01-18
      • 2014-07-12
      相关资源
      最近更新 更多