【问题标题】:How to implement a timeout when using sync.WaitGroup.wait?使用sync.WaitGroup.wait时如何实现超时?
【发布时间】:2015-12-21 11:52:18
【问题描述】:

我遇到了一种情况,我想跟踪一些 goroutine 以在特定点上同步,例如当所有 url 都被获取时。然后,我们可以将它们全部放置并按特定顺序显示。

我认为这是障碍进来的。它在gosync.WaitGroup 中。但是,在实际情况下,我们不能确保所有的 fetch 操作都会在短时间内成功。所以,我想在wait 为获取操作引入超时。

我是Golang的新手,谁能给我一些建议?


我要找的是这样的:

   wg := &sync.WaigGroup{}
   select {
   case <-wg.Wait():
   // All done!
   case <-time.After(500 * time.Millisecond):
   // Hit timeout.
   }

我知道Wait 不支持Channel

【问题讨论】:

  • 您能否发布您如何添加等待组之类的内容,例如您是通过循环还是这样做??

标签: go timeout


【解决方案1】:

如果您想要的只是整洁的选择,您可以通过生成一个调用方法并在完成后关闭/发送通道的例程来轻松地将阻塞函数转换为通道。

done := make(chan struct{})
go func() {
   wg.Wait()
   close(done)
}()

select {
case <-done:
// All done!
case <-time.After(500 * time.Millisecond):
// Hit timeout.
}

【讨论】:

    【解决方案2】:

    如果您想避免将并发逻辑与业务逻辑混为一谈,我编写了这个库https://github.com/shomali11/parallelizer 来帮助您。它封装了并发逻辑,所以你不用担心。

    所以在你的例子中:

    package main
    
    import (
        "github.com/shomali11/parallelizer"
        "fmt"
    )
    
    func main() {
        urls := []string{ ... }
        results = make([]*HttpResponse, len(urls)
    
        options := &Options{ Timeout: time.Second }
        group := parallelizer.NewGroup(options)
        for index, url := range urls {
            group.Add(func(index int, url string, results *[]*HttpResponse) {
                return func () {
                    ...
    
                    results[index] = &HttpResponse{url, response, err}
                }
            }(index, url, &results))
        }
    
        err := group.Run()
    
        fmt.Println("Done")
        fmt.Println(fmt.Sprintf("Results: %v", results))
        fmt.Printf("Error: %v", err) // nil if it completed, err if timed out
    }
    

    【讨论】:

      【解决方案3】:

      将您的结果发送到一个缓冲通道,足以接收所有结果,而不会阻塞,并在主线程的 for-select 循环中读取它们:

      func work(msg string, d time.Duration, ret chan<- string) {
          time.Sleep(d) // Work emulation.
          select {
          case ret <- msg:
          default:
          }
      }
      
      // ...
      
      const N = 2
      ch := make(chan string, N)
      
      go work("printed", 100*time.Millisecond, ch)
      go work("not printed", 1000*time.Millisecond, ch)
      
      timeout := time.After(500 * time.Millisecond)
      loop:
      for received := 0; received < N; received++ {
          select {
          case msg := <-ch:
              fmt.Println(msg)
          case <-timeout:
              fmt.Println("timeout!")
              break loop
          }
      }
      

      游乐场:http://play.golang.org/p/PxeEEJo2dz

      另请参阅:Go Concurrency Patterns: Timing out, moving on

      【讨论】:

      • 不确定为什么这里需要非阻塞发送,除非它只是一种安全措施,以防缓冲区大小与工作人员数量不匹配。否则,当工人在通道上生产某些东西时,计数(建议用于循环)很可能是比 WaitGroup 更好的解决方案。
      【解决方案4】:

      另一种方法是在内部监控它,你的问题是有限的,但我假设你通过循环启动你的 goroutine,即使你不是你可以重构它为你工作但是你可以做这两个例子中的一个,第一个将超时每个请求单独超时,第二个将超时整个请求,如果时间过长则继续

      var wg sync.WaitGroup
      wg.Add(1)
      go func() {
          success := make(chan struct{}, 1)
          go func() {
              // send your request and wait for a response
              // pretend response was received
              time.Sleep(5 * time.Second)
              success <- struct{}{}
              // goroutine will close gracefully after return     
              fmt.Println("Returned Gracefully")
          }()
      
          select {
          case <-success:
              break
          case <-time.After(1 * time.Second):
              break
          }
      
          wg.Done()
          // everything should be garbage collected and no longer take up space
      }()
      
      wg.Wait()
      
      // do whatever with what you got    
      fmt.Println("Done")
      time.Sleep(10 * time.Second)
      fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done")
      

      或者,如果您只是想要一种通用的简单方法来超时所有请求,您可以执行类似的操作

      var wg sync.WaitGroup
      waiter := make(chan int)
      wg.Add(1)
      go func() {
          success := make(chan struct{}, 1)
          go func() {
              // send your request and wait for a response
              // pretend response was received
              time.Sleep(5 * time.Second)
              success <- struct{}{}
              // goroutine will close gracefully after return     
              fmt.Println("Returned Gracefully")
          }()
      
          select {
          case <-success:
              break
          case <-time.After(1 * time.Second):
              // control the timeouts for each request individually to make sure that wg.Done gets called and will let the goroutine holding the .Wait close
              break
          }
          wg.Done()
          // everything should be garbage collected and no longer take up space
      }()
      
      completed := false
      go func(completed *bool) {
          // Unblock with either wait
          wg.Wait()
          if !*completed {
              waiter <- 1         
              *completed = true
          }       
          fmt.Println("Returned Two")
      }(&completed)
      
      go func(completed *bool) {
          // wait however long
          time.Sleep(time.Second * 5)
          if !*completed {
              waiter <- 1         
              *completed = true
          }       
          fmt.Println("Returned One")
      }(&completed)
      
      
       // block until it either times out or .Wait stops blocking 
       <-waiter
      
      // do whatever with what you got    
      fmt.Println("Done")
      time.Sleep(10 * time.Second)
      fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done")
      

      这样你的 WaitGroup 将保持同步,你不会有任何 goroutines 留在边缘

      http://play.golang.org/p/g0J_qJ1BUT 在这里尝试一下,您可以更改变量以查看它的工作方式不同

      编辑:我在手机上如果有人可以修复格式,那将非常感谢。

      【讨论】:

        猜你喜欢
        • 2015-08-21
        • 1970-01-01
        • 1970-01-01
        • 2011-07-12
        • 2012-12-24
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多