【发布时间】:2025-12-30 23:35:09
【问题描述】:
这是一个棘手的问题,让我很烦恼。
基本上,我编写了一个集成微服务,它使用 Go 客户端提供来自 Binance 加密交换的数据流。客户端发送开始消息,启动符号的数据流,并在某个时候发送关闭消息以停止流。我的实现基本上是这样的:
func (c BinanceClient) StartDataStream(clientType bn.ClientType, symbol, interval string) error {
switch clientType {
case bn.SPOT_LIVE:
wsKlineHandler := c.handlers.klineHandler.SpotKlineHandler
wsErrHandler := c.handlers.klineHandler.ErrHandler
_, stopC, err := binance.WsKlineServe(symbol, interval, wsKlineHandler, wsErrHandler)
if err != nil {
fmt.Println(err)
return err
} else {
c.state.clientSymChanMap[clientType][symbol] = stopC
return nil
}
...
}
clientSymChanMap 将 stopChannel 存储在嵌套的 hashmap 中,以便我稍后可以检索停止通道以停止数据馈送。相应地实现了停止功能:
func (c BinanceClient) StopDataStream(clientType bn.ClientType, symbol string) {
//mtd := "StopDataStream: "
stopC := c.state.clientSymChanMap[clientType][symbol]
if isClosed(stopC) {
DbgPrint(" Channel is already closed. Do nothing for: " + symbol)
} else {
close(stopC)
}
// Delete channel from the map otherwise the next StopAll throws a NPE due to closing a dead channel
delete(c.state.clientSymChanMap[clientType], symbol)
return
}
为了防止已经关闭的通道出现恐慌,我使用了一个检查函数,如果通道已经关闭,则返回 true。
func isClosed(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
}
return false
}
看起来不错,但有一个问题。当我只使用一个符号的起始数据运行代码时,它会完全按预期启动和关闭数据馈送。
但是,当启动多个数据馈送时,上面的代码永远不会关闭 websocket,只会永远保持流数据。如果没有 isClosed 检查,我会因为尝试关闭已关闭的通道而感到恐慌,但检查到位后,什么都不会关闭。
查看implementation of the above binance.WsKlineServe 函数时,很明显它只是在每次调用时包装一个新的 websocket,然后返回完成和停止通道。
文档给出了以下使用示例:
wsKlineHandler := func(event *binance.WsKlineEvent) {
fmt.Println(event)
}
errHandler := func(err error) {
fmt.Println(err)
}
doneC, stopC, err := binance.WsKlineServe("LTCBTC", "1m", wsKlineHandler, errHandler)
if err != nil {
fmt.Println(err)
return
}
<-doneC
因为 doneC 通道实际上是阻塞的,所以我删除了它,并认为存储 stopC 通道然后使用它来停止数据馈送会起作用。但是,它只对一个实例这样做。当多个流打开时,这将不再起作用。
知道这是什么情况以及如何解决它吗?
【问题讨论】:
-
似乎可以改进包
go-binance。它应该使用可取消的上下文来关闭正在运行的进程,因为这会抽象出关闭通道的复杂性。 -
@tehSpinx,您能否发布有关正确使用可取消上下文的链接或一些资源?我有一些令人不快的 websockets ...
-
在下面查看我的答案
标签: go websocket channel goroutine binance