【问题标题】:why does this go routine randomly fail to exit when the blocking read connection is closed?为什么当阻塞读取连接关闭时,这个 goroutine 随机无法退出?
【发布时间】:2024-01-09 14:01:01
【问题描述】:

连接关闭时为什么这个rec​​eiver goroutine拒绝终止

这会按预期运行,但随后随机运行,每调用 20-10,000 次,接收器将无法关闭,然后导致 goroutine 泄漏,导致 100% cpu。

注意:如果我记录所有错误,如果 conn.SetReadDeadline 被注释掉,我将在关闭的通道上看到 read。使用时,我将 i/o 超时视为错误。

这运行了 10k 个周期,其中主进程启动了 11 对这些发送/接收器,它们在主进程发送关闭信号之前处理了 1000 个作业。此设置运行了 6 个小时以上,一夜之间达到 10k 周期标记没有任何问题,但今天早上我无法让它运行超过 20 个周期而没有将接收器标记为未关闭和退出。

func sender(w worker, ch channel) {

    var j job
    for {
        select {
        case <-ch.quit: // shutdown broadcast, exit
            w.Close()
            ch.stopped <- w.id // debug, send stop confirmed
            return

        case j = <-w.job: // worker designated jobs
        case j = <-ch.spawner: // FCFS jobs
        }

        ... prepare job ...

        w.WriteToUDP(buf, w.addr)

}

func receiver(w worker, ch channel) {

    deadline := 100 * time.Millisecond
out:
    for {
        w.SetReadDeadline(time.Now().Add(deadline))
        // blocking read, should release on close (or deadline)
        n, err = w.c.Read(buf)

        select {
        case <-ch.quit: // shutdown broadcast, exit
            ch.stopped <- w.id+100 // debug, receiver stop confirmed
            return
        default:
        }

        if n == 0 || err != nil {
            continue
        }
        update := &update{id: w.id}

         ... process update logic ...

        select {
        case <-ch.quit: // shutting down
            break out
        case ch.update <- update
        }

}

我需要一种可靠的方法来让接收器在接收到关闭广播或连接关闭时关闭。功能上,关闭通道应该足够了,是根据go包documentation的首选方法,见Conn接口。

我升级到最新版本,即 1.12.1,没有任何变化。 在开发中的 MacOS 和生产中的 CentOS 上运行。

有人遇到过这个问题吗? 如果是这样,您是如何可靠地修复它的?


可能的解决方案

我的非常冗长和讨厌的解决方案似乎可能有效,作为一种变通方法,是这样做:

1) 在 go 例程中启动发送方,如下所示(上图,未更改)

2) 在 go 例程中启动接收器,如下所示(如下)

func receive(w worker, ch channel) {

    request := make(chan []byte, 1)
    reader := make(chan []byte, 1)

    defer func() {
        close(request) // exit signaling
        w.c.Close()    // exit signaling
        //close(reader)
    }()

    go func() {

        // untried senario, for if we still have leaks -> 100% cpu
        // we may need to be totally reliant on closing request or ch.quit
        // defer w.c.Close()

        deadline := 100 * time.Millisecond
        var n int
        var err error

        for buf := range request {
            for {
                select {
                case <-ch.quit: // shutdown signal
                    return
                default:
                }
                w.c.SetReadDeadline(time.Now().Add(deadline))
                n, err = w.c.Read(buf)
                if err != nil { // timeout or close
                    continue
                }
                break
            }
            select {
            case <-ch.quit: // shutdown signal
                return
            case reader <- buf[:n]:
                //default:
            }
        }
    }()

    var buf []byte

out:
    for {

        request <- make([]byte, messageSize)

        select {
        case <-ch.quit: // shutting down
            break out
        case buf = <-reader:
        }

        update := &update{id: w.id}

      ... process update logic ...


        select {
        case <-ch.quit: // shutting down
            break out
        case ch.update <- update
        }

    }

我的问题是,为什么这个可怕的版本 2 会产生一个新的 go 例程来从阻塞的 c.Read(buf) 中读取数据似乎更可靠地工作,这意味着它在发送关闭信号时不会泄漏,当更简单的第一个版本没有......而且由于阻塞 c.Read(buf),它似乎本质上是一样的。

当这是一个合法且可验证可重复的问题时,将我的问题降级是没有帮助的,该问题仍未得到解答。

【问题讨论】:

  • UDP 不保证传递,但听起来你的代码需要知道消息已传递
  • 它在状态管理器中跟踪交付。问题是,当连接关闭时,它不会产生错误(就像它之前的 1784 x 11 次一样),所以它会泄漏,或者如果我没有记录并报告它无法识别和响应到关机信号。
  • 没有UDP连接这样的东西,因此也没有关闭UDP连接这样的东西。
  • 你没有抓住重点。问题是当我从发件人那里拨打conn.Close() 时,我无法可靠地突破for { if _,err:=conn.Read(buf); err !=nil {break}}。这按预期工作了 10,000 多次,但随后它随机失败并导致 goroutine 泄漏,因为在 conn 上调用 close 语句时它没有关闭/退出 for 循环。为什么?我需要一种可靠且一致的方式来退出 for 循环。如何?这是我的问题。
  • 是你没有抓住重点。您没有理由应该突破阻塞的 UDP 读取。没有断开连接事件,因为没有连接断开。它不像 TCP,它在对等方断开连接时提供流结束。如果您想要一条“断开连接”消息,对等方必须向您发送一条消息,并且您必须降低它无序到达、多次到达或根本没有到达的风险。

标签: go udp exit goroutine


【解决方案1】:

感谢大家的回复。

所以。从来没有堆栈跟踪。事实上,我根本没有遇到任何错误,没有比赛检测或其他任何错误,也没有陷入僵局,go 例程只是不会关闭和退出,而且它不能始终如一地重现。我已经连续两周运行相同的数据了。

当 go 例程无法报告退出时,它会简单地失控并将 CPU 驱动到 100%,但只有在所有其他例程都退出并且系统继续运行之后。我从未见过记忆增长。 CPU 会逐渐上升到 200%、300%、400%,这时系统必须重新启动。

我在发生泄漏时记录了它,它总是一个不同的泄漏,并且在之前成功运行 380 次之后(共运行 23 对 go 例程),我会发生一次泄漏,下一次是 1832 年在一个接收器之前泄露,下一次只有 23 个,在同一起点咀嚼完全相同的数据。泄漏的接收器刚刚失控,但在其他22名同伴全部关闭并成功退出后,系统才进入下一批。它不会一直失败,除非它保证在某些时候泄漏。

经过很多天,无数次重写,以及每次操作之前/之后的一百万条日志,这最终似乎是问题所在,在挖掘了库之后,我不知道究竟是为什么,也不知道为什么它只是随机发生.

无论出于何种原因,如果您解析并直接跳过问题而不先阅读问题,golang.org/x/net/dns/dnsmessage 库会随机崩溃。不知道为什么这很重要,您好,跳过问题意味着您不关心该标题部分并将其标记为已处理,并且它可以连续运行一百万次,但随后就不行了,所以您似乎必须先阅读问题,然后才能跳过所有问题,因为这似乎是解决方案。我有 18,525 个批次,并且添加这会关闭泄漏。

var p dnsmessage.Parser
h, err := p.Start(buf[:n])
if err != nil {
    continue // what!?
}

switch {
case h.RCode == dnsmessage.RCodeSuccess:
    q, err := p.Question() // should only have one question
    if q.Type != w.Type || err != nil {
        continue // what!?, impossible
    }
    // Note: if you do NOT do the above first, you're asking for pain! (tr)
    if err := p.SkipAllQuestions(); err != nil {
        continue // what!?
    }
    // Do not count as "received" until we have passed above point and
    // validated that response had a question that we could skip...

【讨论】:

  • 当然没有堆栈跟踪,您必须在需要时生成一个。使用 SIGQUIT 停止进程通常是最简单的方法。