【问题标题】:Create goroutine and set maximum goroutines创建 goroutine 并设置最大 goroutine
【发布时间】:2019-06-26 02:26:55
【问题描述】:

我正在学习 Golang,但遇到了一些困难。我已经研究过 Google,但没有任何进展。

我编写了一个代码,通过多个服务器的 ICMP 检查 RTT。

它有这个结构:

type Server struct {
    id  uint
    ip  string
    rtt time.Duration
}

它有一个包含多个服务器的切片(它是一个数组吗?)。 对于此切片中的每个服务器,我调用函数 getRTT 返回 RTT,然后将其存储在 Server.rtt,因为在 for 循环之后,我想打印所有服务器及其各自的 RTT。

for _, server := range servers {
    server.rtt = getRTT(server.ip) / time.Millisecond
}
fmt.Println(servers)

同步的问题,所以一一发送“ping”。我想让这个异步并限制最大的 goroutines。示例:一次调用 20 getRTT。

我正在阅读有关 goroutines、maxgroup、channel 的信息,但到目前为止还没有。

【问题讨论】:

  • 工人函数在哪里?结果存储在哪里?
  • 好的!我将继续研究这个,因为这对我来说还不清楚。还是谢谢你!

标签: go


【解决方案1】:

Go 中有很多模式可以为 goroutine 设置阈值。我的最爱之一是使用管道。在管道模式中,您创建一组运行的 goroutine 并将结构作为工作传递给它们。

以下代码是管道的说明性示例。请注意,您必须提供一种同步方式来等待 goroutine 终止,例如使用 sync.WaitGroup

package main

import "fmt"

type handler struct {
        workStream chan int
}

func (h handler) handle() {
        for w := range h.workStream {
                fmt.Printf("do some work with %d\n", w)
        }
}

func main() {
        h := handler{
                workStream: make(chan int),
        }

        // run goroutines as much as you want
        for i := 0; i < 5; i++ {
                go h.handle()
        }

        for i := 0; i < 1000; i++ {
                h.workStream <- i
        }

        close(h.workStream) // by closing this channel all goroutines all killed

        // TODO: wait for all goroutines to die
}

【讨论】:

  • 只有一个频道,对吧?此频道适用于 5 个 goroutine?
  • 你可以在这里把通道想象成一个队列——每个工人从队列中取出最上面的东西,所以是的,只需要一个
【解决方案2】:

启动 20 个 goroutine 来完成这项工作。使用通道将工作分配给这些 goroutine。等待 goroutine 完成。

// c is channel for sending *Server values to worker goroutines.
c := make(chan *Server)

// Start worker goroutines. Each goroutine receives 
// values from c in a loop. The loop breaks when c
// is closed.
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
    wg.Add(1)
    go func() {
       for server := range c {
           server.rtt = getRTT(server.ip)
       }
       wg.Done()
    }()
}

// Feed work to the goroutines.
for _, server := range servers {
    c <- server
}

// Close channel to cause workers to break out of 
// for loop.
close(c)

// Wait for the workers to complete.
wg.Wait()

fmt.Println(servers)

【讨论】:

    【解决方案3】:

    我喜欢为此使用一个简单的计数信号量,并结合一个 sync.WaitGroup 以确保按照@Parham Alvani 的建议完成。 (注意@Parham Alvani 的解决方案至少和这个一样正确)

    (一些解释 - 我们用缓冲区创建一个通道 - 缓冲区大小变成了 goroutine 允许并发执行的数量。每个 goroutine 将一些东西放入通道中,然后将其读回。在下面的代码的情况下,第五次,goroutine 被阻塞添加到通道,直到另一个 goroutine 从通道中取出一些东西。)

    我还让“getRTT”函数在指向服务器的指针上工作,因为我们在这里修改接收器。

    这里的游乐场: https://play.golang.org/p/8Rmp0kHoNFB

    package main
    
    import (
        "fmt"
        "time"
        "sync"
        "math/rand"
    )
    
    
    type Server struct {
        id  uint
        ip  string
        rtt time.Duration
    }
    
    
    func (s *Server) setRTT()  {  
        fmt.Printf("setting rtt for id %d\n", s.id) 
        // do something that takes a while
        sleepyTime := time.Second * time.Duration(rand.Intn(5))
        time.Sleep(sleepyTime)
        s.rtt = sleepyTime
    }
    
    func main() {
    
        servers := []Server{
           {1,"10.10.10.0",0},
           {2,"10.10.10.1",0},
           {3,"10.10.10.2",0},
           {4,"10.10.10.3",0},
           {5,"10.10.10.4",0},
           {6,"10.10.10.5",0},
           {7,"10.10.10.0",0},
           {8,"10.10.10.1",0},
           {9,"10.10.10.2",0},
           {10,"10.10.10.3",0},
           {11,"10.10.10.4",0},
           {12,"10.10.10.5",0},
           {13,"10.10.10.0",0},
           {14,"10.10.10.1",0},
           {15,"10.10.10.2",0},
           {16,"10.10.10.3",0},
    
        }
        semaphore := make(chan struct{}, 4) // limit concurrency simply, you likely want a larger number than 4 here
        var wg sync.WaitGroup // necessary to ensure we complete everything - otherwise main will exit before we are done
    
        wg.Add(len(servers)) 
    
        for i := range servers {
            go func(s *Server) {
                defer wg.Done()
                semaphore <- struct{}{} // put something in channel, will block when > 4
                defer func() { <-semaphore }() // remove something from channel as this goroutine completes, allowing another goroutine to continue
                s.setRTT() 
            }(&servers[i])
        }
        wg.Wait() // wait for it!
        fmt.Println(servers)
    }
    

    示例输出:

    setting rtt for id 16
    setting rtt for id 1
    setting rtt for id 2
    setting rtt for id 3
    setting rtt for id 4
    setting rtt for id 5
    setting rtt for id 6
    setting rtt for id 7
    setting rtt for id 8
    setting rtt for id 9
    setting rtt for id 10
    setting rtt for id 11
    setting rtt for id 12
    setting rtt for id 13
    setting rtt for id 14
    setting rtt for id 15
    [{1 10.10.10.0 2000000000} {2 10.10.10.1 2000000000} {3 10.10.10.2 4000000000} {4 10.10.10.3 1000000000} {5 10.10.10.4 3000000000} {6 10.10.10.5 0} {7 10.10.10.0 0} {8 10.10.10.1 1000000000} {9 10.10.10.2 0} {10 10.10.10.3 4000000000} {11 10.10.10.4 1000000000} {12 10.10.10.5 2000000000} {13 10.10.10.0 4000000000} {14 10.10.10.1 3000000000} {15 10.10.10.2 4000000000} {16 10.10.10.3 1000000000}]
    

    【讨论】:

    • 还要注意范围的更改,使用索引而不是范围变量 - 范围变量将动态生成(复制),并且更改只会更改循环变量,而不是成员的切片。我可能没有很好地解释这一点,但这是一个很大的问题。
    • 不管你在频道上放什么? semaphore &lt;- struct{}{}
    • 不,但是做一个空结构体在内存大小方面是一种不错的选择,但是一个布尔值也可以。
    • 谢谢解释!我虽然通道仅用于在 goroutines 之间传递信息
    猜你喜欢
    • 2012-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-12-07
    • 2017-01-10
    相关资源
    最近更新 更多