【问题标题】:How to get rxjs retry to work with websocket connection如何让 rxjs 重试使用 websocket 连接
【发布时间】:2018-04-02 03:50:09
【问题描述】:

我正在开发一个需要使用 websocket 的项目,以便服务器可以快速将数据推送到 web UI。我正在使用 Angular v5 和 angular-cli。但是,我很难让 websocket 连接在失败时重试。

对于那些对我的项目当前状态感兴趣的人,可以找到here

我关注了this tutorial,但还没有真正掌握 rxjs 原则。我一直在做很多试验和错误,才能达到现在的水平。

所以我创建了一个 websocket 服务来设置Observable

export class WebsocketService {
constructor() { }

private subject: Rx.Subject<MessageEvent>;

public connect(url): Rx.Subject<MessageEvent> {
   if (!this.subject) {
      this.subject = this.create(url);
   }
   return this.subject;
}

private create(url): Rx.Subject<MessageEvent> {
  let ws = new WebSocket(url);

  let observable = Rx.Observable.create(
     (obs: Rx.Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
     })
  let observer = {
     next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
           ws.send(JSON.stringify(data));
        }
     }
  }
  return Rx.Subject.create(observer, observable);
 }

}

然后是RaceDataService,它使用WebSocketService 将数据映射到有用的东西:

export class RaceDataService {

   public dataStream: Subject<any>;

   constructor(wsService: WebsocketService) {
      this.dataStream = <Subject<any>>wsService
         .connect(ETS_URL)
         .map((response:any): any => {
            let data = JSON.parse(response.data);
            return data
         })
         .retry(3)
   }

   sendAction(action:WebsocketAction) {
      this.dataStream.next(action);
   }
}

最后,我订阅了 race-display 组件中的 observable,如下所示:

constructor(private raceDataService:RaceDataService) {
    raceDataService.dataStream.subscribe(
      msg => {
        console.log("Response from websocket: ");
        console.log(msg);
        if(msg.RaceData) {
          this.HandleCurrentRaceData(msg.RaceData);
        }
      },
      err => {
        this.isConnected = false;
      },
      () => {
        this.sessionEnded = true;
      }
    )
  }

现在根据我的阅读,在设置可观察对象之后放置.retry(3) 应该确保连接在失败后重试 3 次。最终这还不够,我将不得不使用retryWhen() 来做一些更高级的事情,但此时这个简单的重试甚至不起作用,websocket 连接只是尝试了一次,仅此而已。

如果有任何指点,我将不胜感激。

【问题讨论】:

  • 据我所知,SignalR 也需要服务器端实现。我使用的是 ESP32(Arduino-ish 设备),所以我必须通过 websocket 处理 json 数据
  • 你在哪里订阅dataStream?在主题上使用map() 只是向管道添加了另一个步骤,但我认为它不会激活它 - subscribe() 会这样做。
  • 你可以尝试注释掉这一行 ws.onclose = obs.complete.bind(obs);看看它是否有效
  • @FanCheung 感谢您的建议,但结果是一样的,只有 1 次连接尝试

标签: angular websocket rxjs


【解决方案1】:

在 SO 问题 WebSocket: How to automatically reconnect after it dies 中注意,他们谈论在套接字关闭时重新连接。

在您的代码中,retry(3) 只会响应通过行 ws.onerror = obs.error.bind(obs) 发生的事件,但这些只会是在套接字处于活动状态时发生的错误

要使用 Rx.retry() 重试连接错误,您可以尝试将 websocket onClose 挂钩到可观察的 onError (ws.onclose = obs.error.bind(obs);),但 Rx.retry() 只会重新运行 wsService.connect(ETS_URL),它只会返回Subject,它(因为它现在已实例化)不会调用wsService.create() 来执行重新打开。

我认为您必须在 websocket 级别处理重试,而不是在可观察级别 - 根据引用的问题,使用附加到 onclose 的函数:

  ws.onclose = function(e) {
    console.log('Socket is closed. Reconnect will be attempted in 1 second.', e.reason);
    setTimeout(function() {
      connect();
    }, 1000);
  };

所以,你有三种类型的错误:

1) 重新打开过早关闭的连接

2) 重试失败的初始打开,为此您需要 try..catch let ws = new WebSocket(url)

3) 连接期间发生的错误,由ws.onerror 发出。我不确定它们是什么,所以不确定当前的策略(将它们传递给 observable)是否是最好的。请注意,可观察流上的错误将关闭流,因此如果可以忽略 ws 错误(即出现错误消息但后续消息正常),那么您不希望流关闭。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-19
    • 2022-08-05
    • 2016-08-09
    • 2023-01-31
    • 2018-10-20
    相关资源
    最近更新 更多