【问题标题】:How to receive data and send back signal using goroutines如何使用 goroutines 接收数据并发送回信号
【发布时间】:2021-06-11 22:24:36
【问题描述】:

我想利用Go中的并发将数据发送到goroutine进行处理和计算使用通道。数据点在一个函数中一个接一个地出现,该函数可以是主函数或某些sendData 函数。 如果可能,我希望从主函数发送数据。

我想将数据从 send 函数发送到一个 goroutine,其中数据存储在一个 slice 中(让我们调用这个 goroutine getData)。某些计算是在这个切片上完成的。 达到某个条件后(取决于切片),我希望 goroutine 向 sendData 函数发出信号,表明对某批数据点的处理已完成。 现在,@987654324 @ 函数不断通过通道将数据点发送到 getData goroutine,在那里不断构建新切片,当达到条件时发送信号 - 处理完成,整个序列不断重复。

作为一个例子,让我们假设数字形式的数据正在从sendData 发送到getData。条件是getData 接收到的数字的运行平均值应该等于 4。让我们将以下数字序列作为我们的数据 - []int{3, 2, 3, 8, 2, 1, 1, 1, 15}。在这里,第一批数字应该是{3, 2, 3, 8},因为这些数字按照这个顺序发送给getData后,发现getData收到数字8后,数字的运行均值为4。然后它向sendData 发送一个信号。发送数据的过程再次按数字顺序开始,下一批是{2, 1, 1, 1, 15}。这里,在getData 接收到数字15 后,它发现运行平均值等于4,再次向sendData 发送信号。 (这是一个非常基本的例子——在我的实际用例中,输入数据和条件更复杂。我有数据将在sendData中实时读取。这里每个数据点都是按顺序读取的,但每个数据点在前一点之后几微秒到达。因此这里的数据到达速度很快,我不想在这个函数中做太多的处理和计算。此外,我希望保持并发性,因为在正在读取数据的函数,数据到达的速度很快。而且,我不希望因为处理完成的 goroutine 中的数据处理而错过这里的数据读取。)

这是我尝试构建代码的方式:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go sendData(ch)
    go getData(ch)
}

func sendData(ch chan int) {
    syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
    for _, data := range syntheticData {
            ch <- data
        }
}

func getData(ch chan int) {
        dataArr := []int{}
        dataArr = append( dataArr, <-ch )
        fmt.Println(dataArr)

        if mean(dataArr) == 4{
          close(ch)
        }

}

func sum(array []int) int {
    var result int = 0
    for _, v := range array {
        result += v
    }

    return result
}


func mean(array []int) float64 {
    sumArr := float64(sum(array)) / float64(len(array))
    return sumArr
}

我没有用上面的代码实现我想要的功能。 如何在 Go 中实现所需的功能?

【问题讨论】:

    标签: go concurrency channel goroutine


    【解决方案1】:

    如果我正确理解了您的问题,这应该可以正常工作。但不是一个好的解决方案。我已将代码注释掉,以便于理解。

    package main
    
    import "fmt"
    
    func main() {
        // Data channel
        ch := make(chan int)
        // Batch close/create channel for signalling
        bsig := make(chan struct{})
    
        go put(ch, bsig)
        get(ch, bsig, 4)
    }
    
    func put(ch chan<- int, bsig chan struct{}) {
        // Listen for batch creation
        go func() {
            n := 1
            // Receives signal from get on the bidrectional
            // channel, and ping back when done on the same
            // channel.
            for range bsig {
                fmt.Printf("Batch #%d\n", n)
                bsig <- struct{}{}
                n++
            }
        }()
    
        // Steam of numbers
        stream := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
        for _, s := range stream {
            ch <- s
        }
        close(ch)
    }
    
    func get(ch <-chan int, bsig chan struct{}, mean int) {
        num := 0
        sum := 0
        batch := make([]int, 0)
        for s := range ch {
            sum += s
            num++
            batch = append(batch, s)
            // Matches the mean, close batch
            // and signal the sender
            if sum/num == mean {
                // Send ping
                bsig <- struct{}{}
                // Wait for pong?
                <-bsig
                // Print the batch
                fmt.Println(batch)
                // Reset the batch
                sum = 0
                num = 0
                batch = batch[:0]
            }
        }
        // Close the channel
        close(bsig)
    }
    

    【讨论】:

      【解决方案2】:

      你只需要一个额外的接收goroutine,例如getData 然后main goroutine 将在数据到达时使用名为ch 的通道发送数据,您需要一个 缓冲 通道来发送信号,例如batchCompletedWaitGroup 等待 getData 同步,当它完成
      就是这样 ,试试it

      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func main() {
          wg := &sync.WaitGroup{}
          ch := make(chan int)
          batchCompleted := make(chan struct{}, 1) // non-blocking signaling channel
          wg.Add(1)
          go getData(ch, batchCompleted, wg)
      
          syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
          i := 0
          check := func() {
              select {
              case <-batchCompleted:
                  i++
                  fmt.Println(i, " batch completed")
              default:
              }
          }
          for _, data := range syntheticData {
              ch <- data
              check()
          }
          close(ch)
          wg.Wait()
          check()
      }
      
      func getData(ch chan int, batchCompleted chan struct{}, wg *sync.WaitGroup) {
          defer wg.Done()
          a := []int{}
          sum, n := 0, 0
          for v := range ch {
              sum += v
              n++
              a = append(a, v)
              if sum == 4*n {
                  batchCompleted <- struct{}{}
                  fmt.Println(a)
                  sum, n = 0, 0
                  a = a[:0]
              }
          }
          if len(a) > 0 {
              fmt.Println("remaining data:", a)
          }
      }
      

      输出:

      [3 2 3 8]
      1  batch completed
      [2 1 1 1 15]
      2  batch completed
      

      【讨论】:

      • 出色的答案。这可以完成工作。谢谢你。 :)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-01-17
      • 2014-08-17
      • 2022-01-09
      • 2014-03-27
      • 2018-06-23
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多