【问题标题】:Priority in Go select statement workaroundGo select 语句解决方法中的优先级
【发布时间】:2012-06-22 11:04:16
【问题描述】:

我希望在两个频道上进行常规收听,当两个频道都耗尽时被阻塞。但是,如果两个通道都包含数据,我希望在处理另一个通道之前先耗尽一个通道。

在下面的工作示例中,我希望在处理 exit 之前将所有 out 排空。我使用没有任何优先顺序的select-statement。我该如何解决这个问题,在退出前处理所有 10 个输出值?

package main

import "fmt"

func sender(out chan int, exit chan bool){
    for i := 1; i <= 10; i++ {
        out <- i
    } 
    exit <- true
}

func main(){
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    L:
    for {
        select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                fmt.Println("Exiting")
                break L
        }
    }
    fmt.Println("Did we get all 10? Most likely not")
}

【问题讨论】:

  • 你给的例子,只需要out通道,发送完成后关闭即可。

标签: select concurrency go channel


【解决方案1】:

另一种方法:

package main

import "fmt"

func sender(c chan int) chan int {
        go func() {
                for i := 1; i <= 15; i++ {
                        c <- i
                }
                close(c)
        }()
        return c
}

func main() {
        for i := range sender(make(chan int, 10)) {
                fmt.Printf("Value: %d\n", i)
        }
        fmt.Println("Did we get all 15? Surely yes")
}

$ go run main.go
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Did we get all 15? Surely yes
$ 

【讨论】:

  • 感谢您的建议!如果我理解正确,您建议只使用一个通道,通过关闭通道来调用退出,从而破坏for range-statement。没错,也许这是一种更好的方法,但就我而言,我正在使用两个渠道。
【解决方案2】:

我创建了一个相当简单的解决方法。它可以满足我的要求,但如果其他人有更好的解决方案,请告诉我:

exiting := false
for !exiting || len(out)>0 {
    select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        case <-exit:
            exiting = true
            fmt.Println("Exiting")
    }
}

我不是在接收时退出,而是标记一个退出,一旦我确定 chan out 中没有任何内容就退出。

【讨论】:

  • 这很有效,而且很紧凑,但是使用了一些你应该尽量避免的技巧。随着程序变得越来越大,标志会变得混乱。它们有点像 gotos。更严重的是, len(chan) 通常可以引入种族。在这种情况下看起来没问题,但在许多情况下,基于 len(chan) 做出决定是无效的,因为它可以在你采取行动之前改变。想象一下这样的情况,你得到 len==0,然后一个值到达,然后一个出口到达,然后 select 选择出口。您可能会耸耸肩说他们几乎同时到达,但在某些时间紧迫的项目中,这可能很重要。
  • 嗯,也许它在我描述的情况下仍然有效。对不起,如果这是一个不好的例子。但无论如何,我尽量避免在同步代码中使用 len。
  • 你好索尼娅:)。很好的输入。是的,就我而言,这并不重要。我只是想在退出之前冲洗掉出去的东西。但是,我实际上使用 for rangeclose(out) 重新编写了代码(如 jmnl 所建议的那样)。然后只有在关闭之前放置在通道管道中的输出事件会被“刷新”。如果纳斯达克要求我为他们做一些围棋程序,我会避免基于 len(chan) 做出决策;)
【解决方案3】:
package main

import "fmt"

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }
    exit <- true
}

func main() {
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        default:
        }
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        case <-exit:
            fmt.Println("Exiting")
        }
        break
    }
    fmt.Println("Did we get all 10? I think so!")
}

第一个选择的默认情况使其非阻塞。 select 将在不查看退出通道的情况下耗尽 out 通道,否则不会等待。如果输出通道为空,则立即下降到第二个选择。第二个选择是阻塞的。它将等待任一通道上的数据。如果出现退出,它会处理它并允许循环退出。如果数据来了,它会回到循环的顶部并返回到排水模式。

【讨论】:

  • 这个想法和我自己的很相似。但确实如此,使用continue-statement,您就不需要标志了。聪明的。好吧,这可能是我能想到的最好的答案。谢谢!
  • 如果输出通道关闭,这将在第一个选择语句中无限循环。
  • jorelli,非常正确。如果你想允许恶意或有问题的 goroutine 意外关闭通道,你需要检查接收的 ok 状态。
  • 这实际上不是一个完全正确的解决方案,因为两个队列都可以在单个上下文切换中接收数据。当多个队列就绪时select 的行为是不确定的(伪随机)。
  • 这不正确。在第二个select阻塞时,如果数据到达outexit通道,不能保证out中的数据会在exit之前被处理。我实际上认为渠道没有解决方案。
【解决方案4】:

该语言本机支持此功能,无需解决方法。很简单:退出通道应该只对生产者可见。退出时,生产者关闭频道。只有当通道为空且关闭时,消费者才会退出。这可以通过对频道进行测距来实现。

这里有一个例子来说明:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    produced  = 0
    processed = 0
)

func produceEndlessly(out chan int, quit chan bool) {
    defer close(out)
    for {
        select {
        case <-quit:
            fmt.Println("RECV QUIT")
            return
        default:
            out <- rand.Int()
            time.Sleep(time.Duration(rand.Int63n(5e6)))
            produced++
        }
    }
}

func quitRandomly(quit chan bool) {
    d := time.Duration(rand.Int63n(5e9))
    fmt.Println("SLEEP", d)
    time.Sleep(d)
    fmt.Println("SEND QUIT")
    quit <- true
}

func main() {
    vals, quit := make(chan int, 10), make(chan bool)
    go produceEndlessly(vals, quit)
    go quitRandomly(quit)
    for x := range vals {
        fmt.Println(x)
        processed++
        time.Sleep(time.Duration(rand.Int63n(5e8)))
    }
    fmt.Println("Produced:", produced)
    fmt.Println("Processed:", processed)
}

【讨论】:

  • 谢谢,这正是我一直在寻找的解决方案,它没有索尼娅回答中的潜在竞争条件错误
  • 只在主程序中的 vals 通道范围内工作
  • 值得注意的是,虽然在问题的前提下完全正确,但这不适用于“N-producers-1-consumer”的情况,因为在生产者之间没有同步的情况下关闭 out 频道可能会引发恐慌。鸡-蛋-问题,因为这样的同步需要在quitout之间进行优先级选择:)
【解决方案5】:

在我的情况下,我真的想将来自一个通道的数据优先于另一个通道,而不仅仅是有一个带外退出信号。为了其他有同样问题的人的利益,我认为这种方法在没有潜在竞争条件的情况下有效:

OUTER:
for channelA != nil || channelB != nil {

    select {

    case typeA, ok := <-channelA:
        if !ok {
            channelA = nil
            continue OUTER
        }
        doSomething(typeA)

    case nodeIn, ok := <-channelB:
        if !ok {
            channelB = nil
            continue OUTER
        }

        // Looped non-blocking nested select here checks that channelA
        // really is drained before we deal with the data from channelB
        NESTED:
        for {
            select {
            case typeA, ok := <-channelA:
                if !ok {
                    channelA = nil
                    continue NESTED
                }
                doSomething(typeA)

            default:
                // We are free to process the typeB data now
                doSomethingElse(typeB)
                break NESTED
            }
        }
    }

}

【讨论】:

    【解决方案6】:

    我认为索尼娅的回答是错误的。这是我的解决方案,有点复杂。

    package main
    
    import "fmt"
    
    func sender(out chan int, exit chan bool){
        for i := 1; i <= 10; i++ {
            out <- i
        } 
        exit <- true
    }
    
    func main(){
        out := make(chan int, 10)
        exit := make(chan bool)
    
        go sender(out, exit)
    
        L:
        for {
            select {
                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                case <-exit:
                    for{
                        select{
                        case i:=<-out:
                            fmt.Printf("Value: %d\n", i)
                        default:
                            fmt.Println("Exiting")
                            break L
                        }
                    }
                    fmt.Println("Exiting")
                    break L
            }
        }
        fmt.Println("Did we get all 10? Yes!")
    }
    

    【讨论】:

      【解决方案7】:

      使用缓冲通道make(chan int, 10)有什么具体原因吗?

      您需要使用您正在使用的无缓冲通道与缓冲通道。

      只需删除10,它应该只是make(chan int)

      这种方式在sender 函数中的执行只能继续到exit &lt;- true 语句来自out 通道的最后一条消息被i := &lt;-out 语句出列之后。如果该语句尚未执行,则无法在 goroutine 中访问 exit &lt;- true

      【讨论】:

        【解决方案8】:

        这是另一种选择。

        消费者代码:

          go func() {
            stop := false
            for {
              select {
              case item, _ := <-r.queue:
                doWork(item)
              case <-r.stopping:
                stop = true
              }
              if stop && len(r.queue) == 0 {
                break
              }
            }
          }()
        

        【讨论】:

          【解决方案9】:

          这是一个解决select的优先级问题的通用习语。

          是的,至少可以说不太好,但可以做到 100%,没有陷阱,也没有隐藏的限制

          这是一个简短的代码示例,解释如下

          package main
          
          import(
              "fmt"
              "time"
          )
          
          func sender(out chan int, exit chan bool) {
              for i := 1; i <= 10; i++ {
                  out <- i
              }
          
              time.Sleep(2000 * time.Millisecond)
              out <- 11
              exit <- true
          }
          
          func main(){
              out := make(chan int, 20)
              exit := make(chan bool)
          
              go sender(out, exit)
          
              time.Sleep(500 * time.Millisecond)
          
              L:
              for {
                  select {
                  case i := <-out:
                      fmt.Printf("Value: %d\n", i)
                  default:
                      select {
                      case i := <-out:
                          fmt.Printf("Value: %d\n", i)
                      case <-exit:
                          select {
                          case i := <-out:
                              fmt.Printf("Value: %d\n", i)
                          default:
                              fmt.Println("Exiting")
                              break L
                          }
                      }
                  }
              }
              fmt.Println("Did we get all 10? Yes.")
              fmt.Println("Did we get 11? DEFINITELY YES")
          }
          

          下面是它的工作原理,上面的 main() 注释:

          func main(){
              out := make(chan int, 20)
              exit := make(chan bool)
              go sender(out, exit)
              time.Sleep(500 * time.Millisecond)
              L:
              for {
                  select {
          
                      // here we go when entering next loop iteration
                      // and check if the out has something to be read from
          
                      // this select is used to handle buffered data in a loop
          
                  case i := <-out:
                      fmt.Printf("Value: %d\n", i)
                  default:
                      // else we fallback in here
          
                      select {
          
                          // this select is used to block when there's no data in either chan
          
                      case i := <-out:
                      // if out has something to read, we unblock, and then go the loop round again
          
                          fmt.Printf("Value: %d\n", i)
                      case <-exit:
                          select {
          
                              // this select is used to explicitly propritize one chan over the another,
                              // in case we woke up (unblocked up) on the low-priority case
          
                              // NOTE:
                              // this will prioritize high-pri one even if it came _second_, in quick
                              // succession to the first one
          
                          case i := <-out:
                              fmt.Printf("Value: %d\n", i)
                          default:
                              fmt.Println("Exiting")
                              break L
                          }
                      }
                  }
              }
          
              fmt.Println("Did we get all 10? Yes.")
              fmt.Println("Did we get 11? DEFINITELY YES")
          }
          

          注意:在玩弄优先级的技巧之前,请确保您正在解决正确的问题。

          很有可能,它可以通过不同的方式解决。

          不过,在 Go 中优先选择选择会是一件很棒的事情。只是一个梦想..

          注意:这是一个非常相似的答案 https://stackoverflow.com/a/45854345/11729048 在这个线程上,但只有 两个 select-s 是嵌套的,而不是像我一样的三个做过。有什么不同?我的方法更有效,我们明确希望在每次循环迭代中处理随机选择。

          但是,如果高优先级通道没有被缓冲,和/或您不希望它有大量数据,只有零星的单个事件, 那么更简单的两阶段习语(如该答案)就足够了:

          L:
          for {
              select {
              case i := <-out:
                  fmt.Printf("Value: %d\n", i)
              case <-exit:
                  select {
                  case i := <-out:
                      fmt.Printf("Value: %d\n", i)
                  default:
                      fmt.Println("Exiting")
                      break L
                  }
              }
          }
          

          基本上是2和3阶段,1被去掉了。

          再说一次:在大约 90% 的情况下,您认为确实需要优先考虑 chan 切换情况,但实际上不需要。

          这是一个单行代码,可以包装在宏中:

          for {
              select { case a1 := <-ch_p1: p1_action(a1); default: select { case a1 := <-ch_p1: p1_action(a1); case a2 := <-ch_p2: select { case a1 := <-ch_p1: p1_action(a1); default: p2_action(a2); }}}
          }
          

          如果您想优先处理两个以上的情况怎么办?

          那么你有两个选择。第一个 - 使用中间 goroutines 构建一棵树,以便每个 fork 都是二进制的(上面的习语)。

          第二个选项是让优先级分叉多于一倍。

          以下是三个优先级的示例:

          for {
              select {
              case a1 := <-ch_p1:
                  p1_action(a1)
              default:
                  select {
                  case a2 := <-ch_p2:
                      p2_action(a2)
                  default:
                      select {    // block here, on this select
                      case a1 := <-ch_p1:
                          p1_action(a1)
                      case a2 := <-ch_p2:
                          select {
                          case a1 := <-ch_p1:
                              p1_action(a1)
                          default:
                              p2_action(a2)
                          }
                      case a3 := <-ch_p3:
                          select {
                          case a1 := <-ch_p1:
                              p1_action(a1)
                          case a2 := <-ch_p2:
                              p1_action(a2)
                          default:
                              p2_action(a3)
                          }
                      }
                  }
              }
          }
          

          也就是说,整个结构在概念上分为三部分,作为原始(二进制)部分。

          再说一次:很有可能,您可以设计您的系统以避免这种混乱。

          P.S.,修辞问题:为什么 Golang 没有将它内置到语言中???问题是修辞问题。

          【讨论】:

            猜你喜欢
            • 2018-02-22
            • 2011-03-23
            • 1970-01-01
            • 1970-01-01
            • 2013-12-04
            • 2010-10-09
            • 2012-07-11
            • 1970-01-01
            • 2018-06-13
            相关资源
            最近更新 更多