【问题标题】:Waiting on a sync.Cond with a timeout等待 sync.Cond 超时
【发布时间】:2019-01-29 00:34:22
【问题描述】:

有没有可能用一些简单的方法来做相当于 Java 的方法

wait(long timeMillis)

它在监视器上等待(大致是互斥量+cond)一段指定的时间,如果没有发出信号则返回?

我在文档中找不到任何内容或在谷歌上搜索,虽然当然可以通过制作 WaitGroup 和弹出计时器 goroutine 来玩一些游戏,但这似乎很乏味/烦人/效率低下简单的功能(顺便说一下,我遇到过的任何底层系统线程库都直接支持)

编辑:是的,我们都读过http://www.golang-book.com/10/index.htmhttps://blog.golang.org/pipelines - 同样,创建更多线程是解决此问题的“坏”(非性能)解决方案,并且通道也不太适合此问题。想象一下一个典型的并发服务器 Join() 方法的用例......(请不​​要告诉我反转控件并使用侦听器模式。您并不总是可以更改正在使用的 API。 ..)

【问题讨论】:

  • 你在使用 appengine 吗??
  • 特别是,在 Tim Cooper 给出的链接上,请注意 Ian Lance Taylor 的评论“条件变量通常不是在 Go 中使用的正确东西”。不要尝试像编写 Java 那样编写 Go。
  • @TimCooper - 有趣,感谢您的链接。但我看到的是“这是处理你的用例的错误方法”和......通道和“side-timers”(goroutine /w timer)都不适用于该线程(或我)中的OP。他们只是关闭了他并关闭了他的票。 =/
  • “它如何应用于我的例子”:你没有给出例子!您在没有任何上下文的情况下要求特定的特性/功能,所以我认为它很可能(基于此处包含“如何在 Go 中执行 ?”的问题水平不佳?)上下文本身很可能是在 Go 中以不同的方式更好地完成设计。如果没有任何上下文来说明您为什么觉得需要条件变量,就不可能说得更多。

标签: go sync


【解决方案1】:

您可以使用通道实现仅支持广播(无信号)的条件变量。这是它的快速要点: https://gist.github.com/zviadm/c234426882bfc8acba88f3503edaaa36#file-cond2-go

您也可以在代码中使用这种替换通道并关闭旧通道的技术。 Gist 中的代码使用 unsafe.Pointer 和原子操作来允许调用“广播”而不获取主同步锁。但是,在您自己的代码中,您通常应该从获取锁中进行广播,这样您就不需要做任何不安全/原子的事情。

虽然此方法有效,但您可能还想结帐:https://godoc.org/golang.org/x/sync/semaphore。如果您制作一个限制为 1 的加权信号量,那也将为您提供所需的所有能力,而且这也是公平的。

【讨论】:

  • 你能解释一下使用加权信号量的最后一点吗?
  • 特别是,您如何跟踪要释放多少服务员?
【解决方案2】:

没有。没有简单的方法可以做到这一点,并且基于该线程,他们不会添加一个。 (尽管与他们讨论可能会让你有所收获)

但总有一条艰难的路。 2个选项:

  1. 滚动您自己的具有此功能的Cond。 (见https://golang.org/src/sync/cond.go
  2. 通过系统调用使用操作系统级别的功能。 (也许是futex?)

这里的挑战 - 以及它不是微不足道的原因 - 是 goroutines 不是线程。 Go 有它自己的自定义调度程序。创建您自己的Cond 将涉及修改运行时的某些部分,而这些部分并不是真正要修改的。 (但是,就像我说的,这是可能的)

对不起,如果这是限制。大多数 go 非常简单——你通常可以毫不费力地跳到下层。但是调度器不是这样的。这是魔法。

魔法对大多数事情都有效,他们在sync 中添加了这些东西,以涵盖一些已知的情况,但它不能。如果你觉得你找到了另一个,也许你可以说服他们添加它。 (但这不仅仅是从另一种编程语言复制 API 或暴露底层 API 的问题)

【讨论】:

  • Caleb - 我用来解决这个问题中阻塞 tcp Accept() 调用的技巧:stackoverflow.com/questions/29948497/… 也绝对适用于条件变量(监听器关闭被条件信号替换循环阻塞goroutine)。稍后我会写一个代码示例来回答这个问题,如果我得到它时没有其他人这样做。 (我猜那是“滚动你自己的 Cond”,不过,在第二次阅读时!)
  • (我实际上看到需要一个包来包装 API 中的所有“传统”阻塞函数,这些函数在通道中不能正确选择为期货......)
  • 您在问题中说我们不能使用 goroutine 或通道。您也没有提供可以使用的示例。
  • 是的,你是对的。实际上,当我写它时,我并没有意识到非阻塞选择语法/默认值,这似乎不那么“沉重” - 而且也不像我现在那样迫切地想把一些解决方案放在一起......
【解决方案3】:

我在今年的 GopherCon 演讲中勾勒出了几个可能的替代方案(请参阅 https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view)。 “条件变量”部分从第 37 张幻灯片开始,但此特定模式在备用幻灯片 (101-105) 中有更详细的介绍。

正如 zviadm 所说,一种选择 (https://play.golang.org/p/tWVvXOs87HX) 是关闭频道。

另一种选择 (https://play.golang.org/p/uRwV_i0v13T) 是让每个服务员分配一个 1-buffered 频道,并让广播者将令牌发送到缓冲区中进行广播。

如果事件是持续性条件,例如“队列为空”,第三种选择 (https://play.golang.org/p/uvx8vFSQ2f0) 是使用 1-buffered 通道并让每个接收器重新填充缓冲区,只要条件持续存在。

【讨论】:

    【解决方案4】:

    https://gitlab.com/jonas.jasas/condchan 可以在等待时超时。请看一个例​​子:

    package main
    
    import (
        "fmt"
        "sync"
        "time"
        "gitlab.com/jonas.jasas/condchan"
    )
    
    func main() {
        cc := condchan.New(&sync.Mutex{})
        timeoutChan := time.After(time.Second)
    
        cc.L.Lock()
        // Passing func that gets channel c that signals when
        // Signal or Broadcast is called on CondChan
        cc.Select(func(c <-chan struct{}) { // Waiting with select
            select {
            case <-c: // Never ending wait
            case <-timeoutChan:
                fmt.Println("Hooray! Just escaped from eternal wait.")
            }
        })
        cc.L.Unlock()
    }
    

    【讨论】:

      【解决方案5】:

      我遇到了同样的问题,结果证明使用频道很容易解决。

      • 信号是通道上的发送
      • 等待只是在通道上等待消息。
      • 超时等待只是对计时器和消息的选择。
      • 广播是一个循环发送消息,直到没有人收听为止。

      与任何条件变量一样,在等待时需要保持互斥锁,强烈建议在发出信号时保持互斥锁。

      我编写了一个遵循 Cond 协议并添加了 WaitOrTimeout 的 in 实现。成功返回true,超时返回false。

      这是我的代码以及一些测试用例! 免责声明:这似乎工作正常,但尚未经过彻底测试。此外,不能保证公平。等待的线程按照调度程序认为合适的顺序释放,不一定是先到先得。

      https://play.golang.org/p/K1veAOGbWZ

      package main
      
      import (
          "sync"
          "time"
          "fmt"
      )
      
      type TMOCond struct {
          L    sync.Locker
          ch      chan bool
      }
      
      func NewTMOCond(l sync.Locker) *TMOCond {
          return &TMOCond{ ch: make(chan bool), L: l }
      }
      
      func (t *TMOCond) Wait() {
          t.L.Unlock()
          <-t.ch
          t.L.Lock()
      }
      
      func (t *TMOCond) WaitOrTimeout(d time.Duration) bool {
          tmo := time.NewTimer(d)
          t.L.Unlock()
          var r bool
          select {
          case <-tmo.C:
          r = false
          case <-t.ch:
              r = true
          }
          if !tmo.Stop() {
              select {
              case <- tmo.C:
              default:
              }
          }
          t.L.Lock()
          return r
      }
      
      func (t *TMOCond) Signal() {
          t.signal()
      }
      
      func (t *TMOCond) Broadcast() {
          for {
              // Stop when we run out of waiters
              //
              if !t.signal() {
                  return
              }
          }
      }
      
      func (t *TMOCond) signal() bool {
          select {
          case t.ch <- true:
              return true
          default:
              return false
          }
      }
      
      // **** TEST CASES ****
      func lockAndSignal(t *TMOCond) {
          t.L.Lock()
          t.Signal()
          t.L.Unlock()
      }
      
      func waitAndPrint(t *TMOCond, i int) {
          t.L.Lock()
          fmt.Println("Goroutine", i, "waiting...")
          ok := t.WaitOrTimeout(10 * time.Second)
          t.L.Unlock()
          fmt.Println("This is goroutine", i, "ok:", ok)
      }
      
      func main() {
          var m sync.Mutex
          t := NewTMOCond(&m)
      
          // Simple wait
          //
          t.L.Lock()
          go lockAndSignal(t)
          t.Wait()
          t.L.Unlock()
          fmt.Println("Simple wait finished.")
      
          // Wait that times out
          //
          t.L.Lock()
          ok := t.WaitOrTimeout(100 * time.Millisecond)
          t.L.Unlock()
          fmt.Println("Timeout wait finished. Timeout:", !ok)
      
      
          // Broadcast. All threads should finish.
          //
          for i := 0; i < 10; i++ {
              go waitAndPrint(t, i)
          }
          time.Sleep(1 * time.Second) 
          t.L.Lock()
          fmt.Println("About to signal")
          t.Broadcast()
          t.L.Unlock()
          time.Sleep(10 * time.Second)
      }
      

      【讨论】:

      • 此代码在收听频道之前解锁。这意味着信号可能永远不会通过,因为信号可能在 WaitOrTimeout 解锁之后但在从通道中拉出之前发生,然后可能会发生信号的默认情况,返回 false。
      猜你喜欢
      • 2017-06-04
      • 1970-01-01
      • 2012-04-19
      • 2012-09-20
      • 2015-02-02
      • 1970-01-01
      • 2020-07-19
      • 2016-09-04
      • 2019-07-09
      相关资源
      最近更新 更多