【问题标题】:Goroutine safe channel close doesn't actually close webscoketGoroutine 安全通道关闭实际上并没有关闭 websocket
【发布时间】:2025-12-30 23:35:09
【问题描述】:

这是一个棘手的问题,让我很烦恼。

基本上,我编写了一个集成微服务,它使用 Go 客户端提供来自 Binance 加密交换的数据流。客户端发送开始消息,启动符号的数据流,并在某个时候发送关闭消息以停止流。我的实现基本上是这样的:


func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
    
    switch clientType {

    case bn.SPOT_LIVE:
        wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
        wsErrHandler := c.handlers.klineHandler.ErrHandler

        _, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
        if err != nil {
            fmt.Println(err)
            return err
        } else {
            c.state.clientSymChanMap[clientType][symbol] = stopC
            return nil
        }
  ... 
}

clientSymChanMap 将 stopChannel 存储在嵌套的 hashmap 中,以便我稍后可以检索停止通道以停止数据馈送。相应地实现了停止功能:


func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
    //mtd := "StopDataStream: "

    stopC := c.state.clientSymChanMap[clientType][symbol]

    if isClosed(stopC) {
        DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
    } else {
        close(stopC)
    }
    // Delete  channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
    delete(c.state.clientSymChanMap[clientType], symbol)
    return
}

为了防止已经关闭的通道出现恐慌,我使用了一个检查函数,如果通道已经关闭,则返回 true。


func isClosed(ch <-chan struct{}) bool {
    select {
    case <-ch:
        return true
    default:
    }
    return false
}

看起来不错,但有一个问题。当我只使用一个符号的起始数据运行代码时,它会完全按预期启动和关闭数据馈送。

但是,当启动多个数据馈送时,上面的代码永远不会关闭 websocket,只会永远保持流数据。如果没有 isClosed 检查,我会因为尝试关闭已关闭的通道而感到恐慌,但检查到位后,什么都不会关闭。

查看implementation of the above binance.WsKlineServe 函数时,很明显它只是在每次调用时包装一个新的 websocket,然后返回完成和停止通道。

文档给出了以下使用示例:


wsKlineHandler := func(event *binance.WsKlineEvent) {
    fmt.Println(event)
}
errHandler := func(err error) {
    fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
    fmt.Println(err)
    return
}
<-doneC 

因为 doneC 通道实际上是阻塞的,所以我删除了它,并认为存储 stopC 通道然后使用它来停止数据馈送会起作用。但是,它只对一个实例这样做。当多个流打开时,这将不再起作用。

知道这是什么情况以及如何解决它吗?

【问题讨论】:

  • 似乎可以改进包go-binance。它应该使用可取消的上下文来关闭正在运行的进程,因为这会抽象出关闭通道的复杂性。
  • @tehSpinx,您能否发布有关正确使用可取消上下文的链接或一些资源?我有一些令人不快的 websockets ...
  • 在下面查看我的答案

标签: go websocket channel goroutine binance


【解决方案1】:

首先,这是危险的:

if isClosed(stopC) {
    DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
    close(stopC) // <- can't be sure channel is still open
}

无法保证在您对通道状态进行轮询检查后,该通道在下一行代码中仍将处于相同状态。因此,如果同时调用这段代码,理论上它可能会出现恐慌。


如果您希望在通道关闭时发生异步操作 - 最好在其自己的 goroutine 中显式执行此操作。所以你可以试试这个:

go func() {

    stopC := c.state.clientSymChanMap[clientType][symbol]
    <-stopC
    // stopC definitely closed now
    delete(c.state.clientSymChanMap[clientType], symbol)
}()

附:您确实需要在您的地图上使用某种互斥锁,因为删除是异步的 - 您需要确保对地图的任何添加都不会与此数据竞争。

P.P.S 通道在超出范围时由 GC 回收。如果您不再阅读它 - 它们不需要被显式关闭以被 GC 回收。

【讨论】:

  • 谢谢,我明天会修补代码(亚洲时间已晚)并发布更新。感谢单独关闭 goroutine 的想法。这使它成为一个非常实用的解决方案。
  • 这开箱即用并解决了问题。我的意思是,还有一些其他的问题,即缺少互斥锁,但是让所有这些恐慌都脱离盘子真的很有帮助。再次感谢您。
  • 请注意,调用 delete 的 goroutine 应该与在调用 StartDataStream 时为处理流而启动的 goroutine 相同。否则,它也可能最终变得不愉快——考虑一下,如果您多次调用 StopDataStream 并多次调用 StartDataStream 以获取相同的 clientType,会发生什么情况。
  • @bcmills 我也有同样的想法——确实应该在呼叫现场完成。这将简化地图的添加/删除逻辑。
【解决方案2】:

使用通道来停止 goroutine 或关闭某些东西是非常棘手的。您可能会做错或忘记做很多事情。

context.WithCancel 将复杂性抽象出来,使代码更具可读性和可维护性。

一些代码sn-ps:

ctx, cancel := context.WitchCancel(context.TODO())
TheThingToCancel(ctx, ...)

// Whenever you want to stop TheThingToCancel. Can be called multiple times.
cancel()

然后在 for 循环中,您通常会有这样的 select

for {
    select {
    case <-ctx.Done():
        return
    default:
    }

    // do stuff
}

这里有一些代码更接近您打开连接的具体情况:

func TheThingToCancel(ctx context.Context) (context.CancelFunc, error) {
    ctx, cancel := context.WithCancel(ctx)

    conn, err := net.Dial("tcp", ":12345")
    if err != nil {
        cancel()
        return nil, err
    }

    go func() {
        <-ctx.Done()
        _ = conn.Close()
    }()

    go func() {
        defer func() {
            _ = conn.Close()
            // make sure context is always cancelled to avoid goroutine leak
            cancel()
        }()

        var bts = make([]byte, 1024)
        for {
            n, err := conn.Read(bts)
            if err != nil {
                return
            }
            fmt.Println(bts[:n])
        }
    }()

    return cancel, nil
}

它返回cancel 函数以便能够从外部关闭它。

取消上下文可以多次完成,而不会出现panic,如果通道多次关闭,则会发生这种情况。这是一个优势。您还可以从其他上下文派生上下文,从而关闭许多通过关闭父上下文来停止不同例程的上下文。精心设计,这对于关闭属于一起但也需要能够单独关闭的不同例程非常强大。

【讨论】: