【问题标题】:Golang Gorilla Websocket stops receiving information at 120 secondsGolang Gorilla Websocket 在 120 秒停止接收信息
【发布时间】:2017-08-17 15:19:51
【问题描述】:

我目前正在尝试连接到 CEX.IO 比特币交易所的 websocket,但不仅遇到了 CEX.IO 的问题,还遇到了其他问题。我所有的连接都在 120 秒左右下降,这让我觉得有一些 TTL 问题正在发生。主包中的 Process() goroutine 最终只是挂起并等待来自 readLoop 的数据,后者刚刚停止接收数据。我在代码中包含了一些只读 API 密钥,因此您可以根据需要进行测试。

package main

import (
  "fmt"
  "bitbucket.org/tradedefender/cryptocurrency/exchange-connector/cexio"
  "github.com/shopspring/decimal"
  "encoding/json"
  "time"
)

type OrderBook struct {
  Asks []Ask
  Bids []Bid
}

type Ask struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

type Bid struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

func main() {
  cexioConn := new(cexio.Connection)

  err := cexioConn.Connect()
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  err = cexioConn.Authenticate("TLwYkktLf7Im6nqSKt6UO1IrU", "9ImOJcR7Qj3LMIyPCzky0D7WE")
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  readChannel := make(chan cexio.IntraAppMessage, 25)

  go cexioConn.ReadLoop(readChannel)

  processor := Processor{
    WatchPairs: [][2]string{
      [2]string{
        "BTC", "USD",
      },
    },
    conn: cexioConn,
  }

  go processor.Process(readChannel)

  // LOL
  for {
    continue
  }

}

type Processor struct {
  WatchPairs [][2]string
  conn *cexio.Connection
}

func (p *Processor) Process(ch <-chan cexio.IntraAppMessage) {

  p.conn.SubscribeToOrderBook(p.WatchPairs[0])

  pingTimer := time.Now().Unix()
  for {

    fmt.Printf("(%v)\n", time.Now().Unix())

    if (time.Now().Unix() - pingTimer) >= 10 {
      fmt.Println("sending ping")
      p.conn.SendPing()
      pingTimer = time.Now().Unix()
    }

    readMsg := <- ch
    output, _ := json.Marshal(readMsg.SocketMessage)
    fmt.Println(string(output))

    if readMsg.SocketMessage.Event == "ping" {
      fmt.Println("sending pong")
      p.conn.SendPong()
      pingTimer = time.Now().Unix()
    }

  }
}

下面是 cexio websocket 的连接器。这是他们 API 的链接:https://cex.io/websocket-api

package cexio

import (
  "github.com/gorilla/websocket"
  //"github.com/shopspring/decimal"
  "github.com/satori/go.uuid"
  "encoding/hex"
  "encoding/json"
  "crypto/hmac"
  "crypto/sha256"
  "bytes"
  "strconv"
  "time"
  "fmt"
)

const Url = "wss://ws.cex.io/ws/"

type Connection struct {
  conn *websocket.Conn
}

type IntraAppMessage struct {
  SocketMessage   GenericMessage
  ProgramMessage  ProgramMessage
}

type GenericMessage struct {
  Event   string      `json:"e"`
  Data    interface{} `json:"data"`
  Auth    AuthData    `json:"auth,omitempty"`
  Ok      string      `json:"ok,omitempty"`
  Oid     string      `json:"oid,omitempty"`
  Time    int64       `json:"time,omitempty"`
}

type ProgramMessage struct {
  Error   string
}

type AuthData struct {
  Key       string  `json:"key"`
  Signature string  `json:"signature"`
  Timestamp int64   `json:"timestamp"`
}

type OrderBookSubscribeData struct {
  Pair      [2]string   `json:"pair"`
  Subscribe bool        `json:"subscribe"`
  Depth     int         `json:"depth"`
}

func (c *Connection) SendPong() error {

  pongMsg := GenericMessage{
    Event: "pong",
  }

  err := c.conn.WriteJSON(pongMsg)
  if err != nil {
    return nil
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) SendPing() error {

  pingMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(pingMsg)
  if err != nil {
    return err
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) Connect() error {
  dialer := *websocket.DefaultDialer
  wsConn, _, err := dialer.Dial(Url, nil)
  if err != nil {
    return err
  }

  c.conn = wsConn
  //c.conn.SetPingHandler(c.HandlePing)

  for {

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "connected" {
      c.Disconnect()
      return err
    } else {
      break
    }

  }

  return nil
}

func (c *Connection) Disconnect() error {
  return c.conn.Close()
}

func (c *Connection) ReadLoop(ch chan<- IntraAppMessage) {
  for {

    fmt.Println("starting new read")

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    ch <- IntraAppMessage{
      SocketMessage: m,
    }

  }
}

func CreateSignature(timestamp int64, key, secret string) string {
  secretBytes := []byte(secret)
  h := hmac.New(sha256.New, secretBytes)

  var buffer bytes.Buffer
  buffer.WriteString(strconv.FormatInt(timestamp, 10))
  buffer.WriteString(key)

  h.Write(buffer.Bytes())

  return hex.EncodeToString(h.Sum(nil))
}

func (c *Connection) Authenticate(key, secret string) error {

  timestamp := time.Now().Unix()
  signature := CreateSignature(timestamp, key, secret)

  var authMsg GenericMessage
  authMsg.Event = "auth"
  authMsg.Auth = AuthData{
    Key: key,
    Signature: signature,
    Timestamp: timestamp,
  }

  err := c.conn.WriteJSON(authMsg)
  if err != nil {
    return err
  }

  for {
    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "auth" && m.Ok != "ok" {
      c.Disconnect()
      return err
    } else {
      break
    }
  }

  return nil

}

func (c *Connection) SubscribeToOrderBook(pair [2]string) error {

  sendMsg := GenericMessage{
    Event: "order-book-subscribe",
    Data: OrderBookSubscribeData{
      Pair: pair,
      Subscribe: true,
      Depth: 0,
    },
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) GetBalance() error {

  sendMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}

【问题讨论】:

  • 这里有很多代码要理解。请在更高层次上描述该程序正在做什么。
  • 程序当前所做的只是打印从服务器接收到的消息。在这种情况下,程序连接到 websocket,使用凭证进行身份验证,然后启动 ReadLoop,它只是收集数据并将其推送到通道中。在 ReadLoop 运行后,我启动 Processor.Process goroutine,它在通道上获取这些消息并立即打印它们。它还查找来自服务器的 ping 消息并做出响应。
  • 使用比赛检测器运行程序。看起来该协议使用应用程序级 ping/pong。为什么还要发送 websocket 协议 ping/pong?该程序向服务器发送未经请求的 websocket 协议 pong。也许服务器不喜欢那样。可以简化程序以消除 goroutine。
  • @CeriseLimón 我添加了协议级别的乒乓球,因为当我只做应用程序级别的乒乓球时,我遇到了同样的问题,所以有人提到这可能是大猩猩包需要协议乒乓球的问题?跨度>
  • 无论好坏,乒乓球/乒乓球是带有 Gorilla 的应用程序的责任。由于缺少消息,Gorilla 不会自行发送 ping 或关闭连接。您应该删除所有 WriteControl,因为您的应用程序没有利用它们,服务器似乎也没有。

标签: go websocket gorilla


【解决方案1】:

解决方案是删除

for { 
  continue 
}

在主函数的末尾

【讨论】:

  • 哈,我没有在所有代码中注意到这一点。使用select {} 永远阻塞一个goroutine。更好的是,直接调用 processor.Process(readChannel) 而不是启动 goroutine。更好的是,删除所有的 goroutine。它们不是必需的。
  • 由于不需要任何 goroutine,我假设您会建议将它们全部集成到一个函数中?
  • ReadLoop 中删除循环,并在每次调用该函数时返回一个 IntraAppMessage。将函数重命名为Read。在Process 中调用Read 而不是从通道接收值。
  • 好电话,我自己不会想到的。感谢大家的帮助!
  • @jrgilman 如果这是答案,您应该尽可能将其标记为正确,以便它在堆栈中被过滤掉。
猜你喜欢
  • 2018-08-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-30
  • 1970-01-01
  • 2016-05-24
  • 2023-01-12
  • 2018-07-22
相关资源
最近更新 更多