【发布时间】:2017-08-17 15:19:51
【问题描述】:
我目前正在尝试连接到 CEX.IO 比特币交易所的 websocket,但不仅遇到了 CEX.IO 的问题,还遇到了其他问题。我所有的连接都在 120 秒左右下降,这让我觉得有一些 TTL 问题正在发生。主包中的 Process() goroutine 最终只是挂起并等待来自 readLoop 的数据,后者刚刚停止接收数据。我在代码中包含了一些只读 API 密钥,因此您可以根据需要进行测试。
package main
import (
"fmt"
"bitbucket.org/tradedefender/cryptocurrency/exchange-connector/cexio"
"github.com/shopspring/decimal"
"encoding/json"
"time"
)
type OrderBook struct {
Asks []Ask
Bids []Bid
}
type Ask struct {
Rate decimal.Decimal
Amount decimal.Decimal
}
type Bid struct {
Rate decimal.Decimal
Amount decimal.Decimal
}
func main() {
cexioConn := new(cexio.Connection)
err := cexioConn.Connect()
if err != nil {
fmt.Errorf("error: %s", err.Error())
}
err = cexioConn.Authenticate("TLwYkktLf7Im6nqSKt6UO1IrU", "9ImOJcR7Qj3LMIyPCzky0D7WE")
if err != nil {
fmt.Errorf("error: %s", err.Error())
}
readChannel := make(chan cexio.IntraAppMessage, 25)
go cexioConn.ReadLoop(readChannel)
processor := Processor{
WatchPairs: [][2]string{
[2]string{
"BTC", "USD",
},
},
conn: cexioConn,
}
go processor.Process(readChannel)
// LOL
for {
continue
}
}
type Processor struct {
WatchPairs [][2]string
conn *cexio.Connection
}
func (p *Processor) Process(ch <-chan cexio.IntraAppMessage) {
p.conn.SubscribeToOrderBook(p.WatchPairs[0])
pingTimer := time.Now().Unix()
for {
fmt.Printf("(%v)\n", time.Now().Unix())
if (time.Now().Unix() - pingTimer) >= 10 {
fmt.Println("sending ping")
p.conn.SendPing()
pingTimer = time.Now().Unix()
}
readMsg := <- ch
output, _ := json.Marshal(readMsg.SocketMessage)
fmt.Println(string(output))
if readMsg.SocketMessage.Event == "ping" {
fmt.Println("sending pong")
p.conn.SendPong()
pingTimer = time.Now().Unix()
}
}
}
下面是 cexio websocket 的连接器。这是他们 API 的链接:https://cex.io/websocket-api
package cexio
import (
"github.com/gorilla/websocket"
//"github.com/shopspring/decimal"
"github.com/satori/go.uuid"
"encoding/hex"
"encoding/json"
"crypto/hmac"
"crypto/sha256"
"bytes"
"strconv"
"time"
"fmt"
)
const Url = "wss://ws.cex.io/ws/"
type Connection struct {
conn *websocket.Conn
}
type IntraAppMessage struct {
SocketMessage GenericMessage
ProgramMessage ProgramMessage
}
type GenericMessage struct {
Event string `json:"e"`
Data interface{} `json:"data"`
Auth AuthData `json:"auth,omitempty"`
Ok string `json:"ok,omitempty"`
Oid string `json:"oid,omitempty"`
Time int64 `json:"time,omitempty"`
}
type ProgramMessage struct {
Error string
}
type AuthData struct {
Key string `json:"key"`
Signature string `json:"signature"`
Timestamp int64 `json:"timestamp"`
}
type OrderBookSubscribeData struct {
Pair [2]string `json:"pair"`
Subscribe bool `json:"subscribe"`
Depth int `json:"depth"`
}
func (c *Connection) SendPong() error {
pongMsg := GenericMessage{
Event: "pong",
}
err := c.conn.WriteJSON(pongMsg)
if err != nil {
return nil
}
deadline := time.Now().Add(15*time.Second)
err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
if err != nil {
return err
}
return nil
}
func (c *Connection) SendPing() error {
pingMsg := GenericMessage{
Event: "get-balance",
Oid: uuid.NewV4().String(),
}
err := c.conn.WriteJSON(pingMsg)
if err != nil {
return err
}
deadline := time.Now().Add(15*time.Second)
err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
if err != nil {
return err
}
return nil
}
func (c *Connection) Connect() error {
dialer := *websocket.DefaultDialer
wsConn, _, err := dialer.Dial(Url, nil)
if err != nil {
return err
}
c.conn = wsConn
//c.conn.SetPingHandler(c.HandlePing)
for {
_, msgBytes, err := c.conn.ReadMessage()
if err != nil {
c.Disconnect()
return err
}
fmt.Println(string(msgBytes))
var m GenericMessage
err = json.Unmarshal(msgBytes, &m)
if err != nil {
c.Disconnect()
return err
}
if m.Event != "connected" {
c.Disconnect()
return err
} else {
break
}
}
return nil
}
func (c *Connection) Disconnect() error {
return c.conn.Close()
}
func (c *Connection) ReadLoop(ch chan<- IntraAppMessage) {
for {
fmt.Println("starting new read")
_, msgBytes, err := c.conn.ReadMessage()
if err != nil {
ch <- IntraAppMessage{
ProgramMessage: ProgramMessage{
Error: err.Error(),
},
}
continue
}
var m GenericMessage
err = json.Unmarshal(msgBytes, &m)
if err != nil {
ch <- IntraAppMessage{
ProgramMessage: ProgramMessage{
Error: err.Error(),
},
}
continue
}
ch <- IntraAppMessage{
SocketMessage: m,
}
}
}
func CreateSignature(timestamp int64, key, secret string) string {
secretBytes := []byte(secret)
h := hmac.New(sha256.New, secretBytes)
var buffer bytes.Buffer
buffer.WriteString(strconv.FormatInt(timestamp, 10))
buffer.WriteString(key)
h.Write(buffer.Bytes())
return hex.EncodeToString(h.Sum(nil))
}
func (c *Connection) Authenticate(key, secret string) error {
timestamp := time.Now().Unix()
signature := CreateSignature(timestamp, key, secret)
var authMsg GenericMessage
authMsg.Event = "auth"
authMsg.Auth = AuthData{
Key: key,
Signature: signature,
Timestamp: timestamp,
}
err := c.conn.WriteJSON(authMsg)
if err != nil {
return err
}
for {
_, msgBytes, err := c.conn.ReadMessage()
if err != nil {
c.Disconnect()
return err
}
fmt.Println(string(msgBytes))
var m GenericMessage
err = json.Unmarshal(msgBytes, &m)
if err != nil {
c.Disconnect()
return err
}
if m.Event != "auth" && m.Ok != "ok" {
c.Disconnect()
return err
} else {
break
}
}
return nil
}
func (c *Connection) SubscribeToOrderBook(pair [2]string) error {
sendMsg := GenericMessage{
Event: "order-book-subscribe",
Data: OrderBookSubscribeData{
Pair: pair,
Subscribe: true,
Depth: 0,
},
Oid: uuid.NewV4().String(),
}
err := c.conn.WriteJSON(sendMsg)
if err != nil {
return err
}
return nil
}
func (c *Connection) GetBalance() error {
sendMsg := GenericMessage{
Event: "get-balance",
Oid: uuid.NewV4().String(),
}
err := c.conn.WriteJSON(sendMsg)
if err != nil {
return err
}
return nil
}
【问题讨论】:
-
这里有很多代码要理解。请在更高层次上描述该程序正在做什么。
-
程序当前所做的只是打印从服务器接收到的消息。在这种情况下,程序连接到 websocket,使用凭证进行身份验证,然后启动 ReadLoop,它只是收集数据并将其推送到通道中。在 ReadLoop 运行后,我启动 Processor.Process goroutine,它在通道上获取这些消息并立即打印它们。它还查找来自服务器的 ping 消息并做出响应。
-
使用比赛检测器运行程序。看起来该协议使用应用程序级 ping/pong。为什么还要发送 websocket 协议 ping/pong?该程序向服务器发送未经请求的 websocket 协议 pong。也许服务器不喜欢那样。可以简化程序以消除 goroutine。
-
@CeriseLimón 我添加了协议级别的乒乓球,因为当我只做应用程序级别的乒乓球时,我遇到了同样的问题,所以有人提到这可能是大猩猩包需要协议乒乓球的问题?跨度>
-
无论好坏,乒乓球/乒乓球是带有 Gorilla 的应用程序的责任。由于缺少消息,Gorilla 不会自行发送 ping 或关闭连接。您应该删除所有 WriteControl,因为您的应用程序没有利用它们,服务器似乎也没有。