【发布时间】:2021-06-11 22:24:36
【问题描述】:
我想利用Go中的并发将数据发送到goroutine进行处理和计算使用通道。数据点在一个函数中一个接一个地出现,该函数可以是主函数或某些sendData 函数。 如果可能,我希望从主函数发送数据。
我想将数据从 send 函数发送到一个 goroutine,其中数据存储在一个 slice 中(让我们调用这个 goroutine getData)。某些计算是在这个切片上完成的。 达到某个条件后(取决于切片),我希望 goroutine 向 sendData 函数发出信号,表明对某批数据点的处理已完成。 现在,@987654324 @ 函数不断通过通道将数据点发送到 getData goroutine,在那里不断构建新切片,当达到条件时发送信号 - 处理完成,整个序列不断重复。
作为一个例子,让我们假设数字形式的数据正在从sendData 发送到getData。条件是getData 接收到的数字的运行平均值应该等于 4。让我们将以下数字序列作为我们的数据 - []int{3, 2, 3, 8, 2, 1, 1, 1, 15}。在这里,第一批数字应该是{3, 2, 3, 8},因为这些数字按照这个顺序发送给getData后,发现getData收到数字8后,数字的运行均值为4。然后它向sendData 发送一个信号。发送数据的过程再次按数字顺序开始,下一批是{2, 1, 1, 1, 15}。这里,在getData 接收到数字15 后,它发现运行平均值等于4,再次向sendData 发送信号。 (这是一个非常基本的例子——在我的实际用例中,输入数据和条件更复杂。我有数据将在sendData中实时读取。这里每个数据点都是按顺序读取的,但每个数据点在前一点之后几微秒到达。因此这里的数据到达速度很快,我不想在这个函数中做太多的处理和计算。此外,我希望保持并发性,因为在正在读取数据的函数,数据到达的速度很快。而且,我不希望因为处理完成的 goroutine 中的数据处理而错过这里的数据读取。)
这是我尝试构建代码的方式:
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go sendData(ch)
go getData(ch)
}
func sendData(ch chan int) {
syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
for _, data := range syntheticData {
ch <- data
}
}
func getData(ch chan int) {
dataArr := []int{}
dataArr = append( dataArr, <-ch )
fmt.Println(dataArr)
if mean(dataArr) == 4{
close(ch)
}
}
func sum(array []int) int {
var result int = 0
for _, v := range array {
result += v
}
return result
}
func mean(array []int) float64 {
sumArr := float64(sum(array)) / float64(len(array))
return sumArr
}
我没有用上面的代码实现我想要的功能。 如何在 Go 中实现所需的功能?
【问题讨论】:
标签: go concurrency channel goroutine