【问题标题】:Pattern to avoid nested selects with repeated cases避免重复案例的嵌套选择的模式
【发布时间】:2021-06-15 18:02:59
【问题描述】:

我正在阅读 O'Reilly 的 Go 并发书,并找到了以下代码示例:

doWork := func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go func() {
        defer close(heartbeat)
        defer close(results)
        pulse := time.Tick(pulseInterval)
        workGen := time.Tick(2 * pulseInterval) // this just creates workload
        sendPulse := func() {
            select {
            case heartbeat <- struct{}{}:
            default:
            }
        }
        sendResult := func(r time.Time) {
            for {
                select {
                case <-done:
                    return
                case <-pulse:
                    sendPulse()
                case results <- r:
                    return
                }
            }
        }
        for {
            select {
            case <-done:
                return
            case <-pulse:
                sendPulse()
            case r := <-workGen:
                sendResult(r)
            }
        }
    }()
    return heartbeat, results
}

两个 select 语句被用于一个简单的心跳似乎很奇怪。然后我明白了这背后的原因是 sendResults 不应该阻止尝试将结果发送到results 频道。如果没有人从该通道接收,它将有效地阻塞 goroutine 并停止发送心跳。

然后我想……他们为什么不这样编码呢?

doWork := func(
    done <-chan interface{},
    pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
    heartbeat := make(chan interface{})
    results := make(chan time.Time)
    go func() {
        defer close(heartbeat)
        defer close(results)
        pulse := time.Tick(pulseInterval)
        workGen := time.Tick(2 * pulseInterval) // this just creates workload
        sendPulse := func() {
            select {
            case heartbeat <- struct{}{}:
            default:
            }
        }
        for {
            select {
            case <-done:
                return
            case <-pulse:
                sendPulse()
            case results <- <- workgen:
                fmt.Println("Send result")
            }
        }
    }()
    return heartbeat, results
}

并意识到这是因为如果 workgen 有一个可用的项目并且我阅读了它但没有人从结果中读取,那么 workgen 项目将会丢失。

问题

是否有任何其他模式可以避免这些必须重复所有情况才能不遗漏值的嵌套选择?

想法

如果一个 select 语句首先验证结果通道正在等待接收一个值而不是先执行右手表达式,那么难道所有这些问题都不会得到解决吗?

换句话说,如果我有这个results &lt;- &lt;- workgen,如果选择说“嘿,没有人从结果中读取,我们不要尝试评估

我认为一个案例应该首先尝试评估表达式中的通道是否准备好接收或发送数据,然后执行表达式。

【问题讨论】:

  • 它会释放工作项并且不会阻塞 select 语句。我编写了代码并对其进行了测试。请问可以验证吗?我很确定我测试了什么。
  • 这种模式 results &lt;- &lt;- workgen 不会按照你的意愿工作,因为 select 语句只会关注 results &lt;- x 部分,而 &lt;-workgen 结果将被立即评估(并被丢弃如果results 还没有准备好)。见Both receive and send in same select case
  • 一个案例应该首先尝试评估表达式中的通道是否准备好接收或发送数据”。问题是这些信息只有在连贯时才有用。如果您看到如果您看到 A 已准备好,然后您看到 B 已准备好,那并不一定意味着 A 和 B 已经或曾经同时准备好。运行时理论上可以解决这个问题,但解决方案可能过于复杂或在调度程序中产生大量噪音。
  • @HymnsForDisco 耶哈,你是对的。该问题的解决方案将非常复杂。另外,假设表达式中的所有通道都准备好了。在评估子表达式时,可能一个或多个通道不再可用(准备好读取和发送)。

标签: go concurrency


【解决方案1】:

在选择中为workgenresults 使用临时变量。根据当前状态将这些变量设置为nil 或原始值。

将变量设置为 nil 会禁用 select 的分支,因为 nil 通道上的通道操作会永远阻塞。

    workGenT := workGen
    var resultsT chan time.Time
    var result time.Time
    for {
        select {
        case <-done:
            return
        case <-pulse:
            sendPulse()
        case result = <-workgenT:
            workgenT = nil
            resultsT = results
        case resultsT <- result:
            resultsT = nil
            workgenT = workgen
        }
    }

因为workgenTresultsT只有一个是非nil,所以上面的代码有两种状态:程序要么等待接收结果,要么等待发送结果。

【讨论】:

  • 哇,这是一个非常聪明的模式。不仅仅是为了解决这个特定的问题,它可以应用在很多地方。这基本上是说“选择语句中的案例可以关闭或重新激活同一语句中的其他案例”。美丽的。另外,我刚刚了解了 nil 通道在 select 语句中的行为,我以为他们会恐慌。
  • 嘿,很抱歉改变这个问题的主题,但我不喜欢这种心跳模式。它使用一个通道来传达脉冲。如果有很多客户端 goroutine 需要检查这个 goroutine 上的脉冲怎么办?频道只会向其中一个客户端 goroutine 发送脉冲,频道不会广播。 Go 中是否有任何广播机制可用于此目的?假设我不知道客户端 goroutine 的数量......因为如果我知道有 N 个客户端,我可以在每个脉冲上发送 N 个脉冲。
【解决方案2】:

另一个简单相似的模式。

func hBeatTicker(ctx context.Context, ch chan<- struct{}, t time.Duration) {
    tick := time.NewTicker(t)
    for {
        select {
        case <-ctx.Done():
            close(ch)
            tick.Stop()
            return
        case <-tick.C:
            ch <- struct{}{}
        }
    }
}

func workGen(ctx context.Context, ch chan<- time.Time, t time.Duration) {
    tick := time.NewTicker(2 * t)
    for {
        select {
        case <-ctx.Done():
            close(ch)
            tick.Stop()
            return
        case ch <- <-tick.C:
        }
    }
}

func do(ctx context.Context, d time.Duration) (<-chan struct{}, <-chan time.Time) {
    heartbeat, results := make(chan struct{}), make(chan time.Time)

    go workGen(ctx, results, d)
    go hBeatTicker(ctx, heartbeat, d)

    return heartbeat, results
}

【讨论】:

  • 好的,把它分成两个 goroutine 使它们独立,并有助于避免重复代码但是,我只是想知道为什么我们需要在 goroutine 中进行健康检查?例如,在哪种情况下它可以“​​停止”工作以让客户说“嘿,那个例程不再健康”?在回答了这个问题之后,这种破坏工作和健康检查装置的策略会影响健康检查的目的吗?
  • 假设 workGen 正在做一些工作,过了一会儿,出现了错误,您想要停止,甚至向心跳发生器发出信号以停止发送健康的心跳。因此,当错误出现时,您可以取消上下文或为此使用其他通道。
猜你喜欢
  • 2017-06-21
  • 1970-01-01
  • 1970-01-01
  • 2017-02-05
  • 2011-12-09
  • 1970-01-01
  • 1970-01-01
  • 2015-08-16
  • 1970-01-01
相关资源
最近更新 更多