【问题标题】:How to timeout a semaphore如何使信号量超时
【发布时间】:2013-11-07 23:11:02
【问题描述】:

Go 中的信号量是通过通道实现的:

一个例子是这样的: https://sites.google.com/site/gopatterns/concurrency/semaphores

上下文:

我们有几百台服务器,并且有一些共享资源要限制访问。因此,对于给定的资源,我们希望使用信号量来限制这些服务器只能访问 5 个并发访问。为此,我们计划使用锁定服务器。当一台机器访问资源时,它会首先向锁服务器注册它正在通过密钥访问资源。然后当它完成时,它会向锁服务器发送另一个请求,说它已经完成并释放信号量。这可确保我们将对这些资源的访问限制为最大并发访问数。

问题:如果出现问题,想要优雅地处理。

问题

如何在信号量上实现超时?

示例:

假设我的信号量大小为 5。同时有 10 个进程试图获取信号量中的锁,因此在这种情况下只有 5 个进程会获取它。

有时,进程会在没有响应的情况下死掉(真正的原因解释起来有点复杂,但基本上有时进程可能无法解锁它)因此导致信号量中的空间现在被永久锁定的问题。

所以我想对此有一个超时。以下是一些问题:

进程将在 2 秒到 60 分钟之间运行。

我们有一些竞争条件,因为如果它超时然后进程尝试解锁它,那么我们已经解锁了信号量两次而不是一次。反之亦然,我们先解锁,然后超时。

如何采用上面发布的建议模式并将其转换为具有超时的线程安全信号量?

【问题讨论】:

  • 您的要求有多严格?如果超过 5 台服务器同时访问共享资源,您是否尝试限制对资源的访问,或者是否存在硬故障模式?
  • 这里有多个 Go 计数信号量示例:github.com/tarndt/sema
  • 嗨,这个问题的作者是如何解决这个用例的?还是解决了?

标签: multithreading concurrency thread-safety go semaphore


【解决方案1】:

要弄清楚你想要完成什么有点困难,但据我所知,你正试图让并发 goroutine 访问共享资源并在出现问题时优雅地处理它。我有几个关于如何处理这个问题的建议。

1) 使用同步包中的 WaitGroup:http://golang.org/pkg/sync/#example_WaitGroup

使用这种策略,您基本上可以在每次调用新的 goroutine 之前添加一个计数器,并使用 defer 来确保它从计数器中删除(因此无论它超时还是由于其他原因返回,它仍然会从计数器中删除)。然后你使用wg.Wait() 命令来确保在所有 go 例程都返回之前它不会继续。这是一个示例:http://play.golang.org/p/wnm24TcBZg 请注意,如果没有 wg.Wait(),它将不会等待 go 例程完成,然后再从 main 返回并终止。

2) 使用 time.Ticker 自动超时:http://golang.org/pkg/time/#Ticker

这种方法基本上会设置一个计时器,该计时器将在设定的时间间隔内触发。您可以使用此计时器来控制基于时间的事件。基本上,这必须在 for 循环中运行,等待通道被输入一个滴答声,就像在这个例子中一样:http://play.golang.org/p/IHeqmiFBSS

同样,不完全确定您要完成什么,但您可以考虑将这两种方法结合起来,这样如果您的进程超时并处于循环中,则自动收报机将捕获它并在设定的时间后返回调用 defer wg.Done() 函数,以便继续等待它的代码部分。希望这至少有点帮助。

【讨论】:

  • 对不起,我会尽力解释得更好。但这基本上是一个分布式信号量服务器。限制数百台机器对资源的并发访问。所以 defer / wait.Syncgroup 不起作用。 time.AfterFunc 或 time.Ticker 是个好主意,但是,如果超时然后进程返回并解锁它怎么办?
  • Verran,我刚刚修改了我的问题,希望现在更有意义?
  • 好的,这就是我想出的:play.golang.org/p/Q2VX25ov4T 这还不是很全面,但我认为它可以很好地满足您的要求。 cmets 几乎解释了正在发生的一切,但如果您有更多问题,请随时询问。代码有点复杂,所以它实际上并没有在操场上运行,但它会在我的系统上运行,直到它遇到死锁,但你应该能够为你的目的修改它以避免这种情况。
【解决方案2】:

由于您正在创建分布式锁服务,我假设您的锁服务器侦听端口,当您接受()连接时,您循环,等待每个连接的 goroutine 中的命令。并且当套接字被丢弃时该 goroutine 退出(即:远程节点崩溃)

所以,假设这是真的,你可以做几件事。

1) 创建一个深度与多少并发锁相匹配的通道 2)当你锁定时,向频道发送消息(如果满了会阻塞) 3)解锁时,只需阅读频道中的消息 4)你可以“延迟释放()”(如果你已经锁定,释放会消耗一条消息)

这是一个粗略的工作示例,除了套接字之外的所有内容。 希望这是有道理的。 http://play.golang.org/p/DLOX7m8m6q

package main

import "fmt"

import "time"

type Locker struct {
    ch chan int
    locked bool
}

func (l *Locker) lock(){
    l.ch <- 1
    l.locked=true
}
func (l *Locker) unlock() {
    if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
        l.locked = false // avoid unlocking twice if socket crashes after unlock
        <- l.ch
    }
}

func dostuff(name string, locker Locker) {
    locker.lock()
    defer locker.unlock()
    fmt.Println(name,"Doing stuff")
    time.Sleep(1 * time.Second)
}

func main() {
    ch := make(chan int, 2)
    go dostuff("1",Locker{ch,false})
    go dostuff("2",Locker{ch,false})
    go dostuff("3",Locker{ch,false})
    go dostuff("4",Locker{ch,false})
    time.Sleep(4 * time.Second)
}

【讨论】:

  • 嘿大卫,谢谢你的回答。但是,dostuff 部分实际上是在客户端。我们需要超时信号量的原因是客户端可能会死掉(服务器可能会宕机)。所以信号量本身必须知道和超时锁。不确定这是否有意义?
  • @samol 所以让dostuff 等待客户端告诉它已经完成让客户端的连接消失,在后一种情况下你可以假设它失败并释放资源(或在额外超时或查看客户端是否重新连接后释放它,无论如何)。 IE。要求客户端至少在使用资源的期间保持 TCP 连接。
【解决方案3】:
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
sem := semaphore.NewWeighted(int64(10))

if err := sem.Acquire(ctx, 1); err != nil {
 // - error means timeout else lock
}

【讨论】:

    【解决方案4】:

    一些假设:

    • 您需要大约 5 个服务器一次通过锁定服务器。
    • 对该资源的访问时间较短且长度相似。

    使用配额服务器而不是锁定服务器。以平均(平均,第 75 次等)访问/锁定时间的 5 倍补充配额(一个简单的计数器)。仅在低于最大值时才补充配额。这样一来,您将平均维持大约 5 个并发访问/锁定。

    一些高级功能:

    • 如果共享资源可以检测到它自己的负载,它可以告诉配额服务器它可以进行更多或更少的并发访问。
    • 服务器完成后可以 ping 配额服务器。这不是必需的,但可以更快地释放资源。

    【讨论】:

      【解决方案5】:

      也许这会有所帮助,但我认为这种实现过于广泛
      我将不胜感激有关代码的任何建议。

      package main
      
      import (
         "fmt"
         "time"
         "math/rand"
         "strconv"
      )
      
      type Empty interface{}
      
      type Semaphore struct {
          dur time.Duration
          ch  chan Empty
      }
      
      func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
          sem = new(Semaphore)
          sem.dur = dur
          sem.ch = make(chan Empty, max)
          return
      }
      
      type Timeout struct{}
      
      type Work struct{}
      
      var null Empty
      var timeout Timeout
      var work Work
      
      var global = time.Now()
      
      func (sem *Semaphore) StartJob(id int, job func()) {
          sem.ch <- null
          go func() {
              ch := make(chan interface{})
              go func() {
                  time.Sleep(sem.dur)
                  ch <- timeout
              }()
              go func() {
                  fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
                  job()
                  ch <- work
              }()
              switch (<-ch).(type) {
              case Timeout:
                  fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
              case Work:
                  fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
              }
              <-sem.ch
          }()
      }
      
      func main() {
          rand.Seed(time.Now().Unix())
          sem := NewSemaphore(3, 3*time.Second)
          for i := 0; i < 10; i++ {
              id := i
              go sem.StartJob(i, func() {
                  seconds := 2 + rand.Intn(5)
                  fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
                  time.Sleep(time.Duration(seconds) * time.Second)
              })
          }
          time.Sleep(30 * time.Second)
      }
      

      【讨论】:

        猜你喜欢
        • 2010-10-07
        • 1970-01-01
        • 2017-04-12
        • 2012-09-03
        • 2011-08-09
        • 2010-11-28
        • 1970-01-01
        • 2015-08-05
        • 2014-04-13
        相关资源
        最近更新 更多