【发布时间】:2024-01-09 14:01:01
【问题描述】:
连接关闭时为什么这个receiver goroutine拒绝终止
这会按预期运行,但随后随机运行,每调用 20-10,000 次,接收器将无法关闭,然后导致 goroutine 泄漏,导致 100% cpu。
注意:如果我记录所有错误,如果 conn.SetReadDeadline 被注释掉,我将在关闭的通道上看到 read。使用时,我将 i/o 超时视为错误。
这运行了 10k 个周期,其中主进程启动了 11 对这些发送/接收器,它们在主进程发送关闭信号之前处理了 1000 个作业。此设置运行了 6 个小时以上,一夜之间达到 10k 周期标记没有任何问题,但今天早上我无法让它运行超过 20 个周期而没有将接收器标记为未关闭和退出。
func sender(w worker, ch channel) {
var j job
for {
select {
case <-ch.quit: // shutdown broadcast, exit
w.Close()
ch.stopped <- w.id // debug, send stop confirmed
return
case j = <-w.job: // worker designated jobs
case j = <-ch.spawner: // FCFS jobs
}
... prepare job ...
w.WriteToUDP(buf, w.addr)
}
func receiver(w worker, ch channel) {
deadline := 100 * time.Millisecond
out:
for {
w.SetReadDeadline(time.Now().Add(deadline))
// blocking read, should release on close (or deadline)
n, err = w.c.Read(buf)
select {
case <-ch.quit: // shutdown broadcast, exit
ch.stopped <- w.id+100 // debug, receiver stop confirmed
return
default:
}
if n == 0 || err != nil {
continue
}
update := &update{id: w.id}
... process update logic ...
select {
case <-ch.quit: // shutting down
break out
case ch.update <- update
}
}
我需要一种可靠的方法来让接收器在接收到关闭广播或连接关闭时关闭。功能上,关闭通道应该足够了,是根据go包documentation的首选方法,见Conn接口。
我升级到最新版本,即 1.12.1,没有任何变化。 在开发中的 MacOS 和生产中的 CentOS 上运行。
有人遇到过这个问题吗? 如果是这样,您是如何可靠地修复它的?
可能的解决方案
我的非常冗长和讨厌的解决方案似乎可能有效,作为一种变通方法,是这样做:
1) 在 go 例程中启动发送方,如下所示(上图,未更改)
2) 在 go 例程中启动接收器,如下所示(如下)
func receive(w worker, ch channel) {
request := make(chan []byte, 1)
reader := make(chan []byte, 1)
defer func() {
close(request) // exit signaling
w.c.Close() // exit signaling
//close(reader)
}()
go func() {
// untried senario, for if we still have leaks -> 100% cpu
// we may need to be totally reliant on closing request or ch.quit
// defer w.c.Close()
deadline := 100 * time.Millisecond
var n int
var err error
for buf := range request {
for {
select {
case <-ch.quit: // shutdown signal
return
default:
}
w.c.SetReadDeadline(time.Now().Add(deadline))
n, err = w.c.Read(buf)
if err != nil { // timeout or close
continue
}
break
}
select {
case <-ch.quit: // shutdown signal
return
case reader <- buf[:n]:
//default:
}
}
}()
var buf []byte
out:
for {
request <- make([]byte, messageSize)
select {
case <-ch.quit: // shutting down
break out
case buf = <-reader:
}
update := &update{id: w.id}
... process update logic ...
select {
case <-ch.quit: // shutting down
break out
case ch.update <- update
}
}
我的问题是,为什么这个可怕的版本 2 会产生一个新的 go 例程来从阻塞的 c.Read(buf) 中读取数据似乎更可靠地工作,这意味着它在发送关闭信号时不会泄漏,当更简单的第一个版本没有......而且由于阻塞 c.Read(buf),它似乎本质上是一样的。
当这是一个合法且可验证可重复的问题时,将我的问题降级是没有帮助的,该问题仍未得到解答。
【问题讨论】:
-
UDP 不保证传递,但听起来你的代码需要知道消息已传递
-
它在状态管理器中跟踪交付。问题是,当连接关闭时,它不会产生错误(就像它之前的 1784 x 11 次一样),所以它会泄漏,或者如果我没有记录并报告它无法识别和响应到关机信号。
-
没有UDP连接这样的东西,因此也没有关闭UDP连接这样的东西。
-
你没有抓住重点。问题是当我从发件人那里拨打
conn.Close()时,我无法可靠地突破for { if _,err:=conn.Read(buf); err !=nil {break}}。这按预期工作了 10,000 多次,但随后它随机失败并导致 goroutine 泄漏,因为在 conn 上调用 close 语句时它没有关闭/退出 for 循环。为什么?我需要一种可靠且一致的方式来退出 for 循环。如何?这是我的问题。 -
是你没有抓住重点。您没有理由应该突破阻塞的 UDP 读取。没有断开连接事件,因为没有连接断开。它不像 TCP,它在对等方断开连接时提供流结束。如果您想要一条“断开连接”消息,对等方必须向您发送一条消息,并且您必须降低它无序到达、多次到达或根本没有到达的风险。