【问题标题】:RxJS Asynchronous Request UpdateRxJS 异步请求更新
【发布时间】:2016-04-09 01:35:46
【问题描述】:

所以,感谢 RxJS,我目前正在学习响应式思考。现在,我正在阅读一本 RxJS 书(使用 RxJS 进行反应式编程),并且我已经阅读了 AsyncSubject 以及它如何只缓存收到的最后一个值。我想知道的是,如果我想更新服务器并拥有这个可观察的更新会发生什么?既然调用了 onComplete,我需要创建一个全新的观察者吗?我应该遵循另一种模式吗?

我的总体要求是我希望有一种方法可以干净利落地将数据传入和传出服务器,并始​​终保持我的 observable(模型)新鲜。

谢谢, 李

【问题讨论】:

  • 为什么不将 Observable 与 Websocket 一起使用?
  • 您能否更明确地说明您所寻求的行为。例如,指定输入与预期输出,或绘制大理石图。如果您解释为什么/谁调用 onComplete 也很好
  • 我只需要能够在需要时查询服务器,而不是每次都创建一个新的观察者。我不太确定如何阻止观察者完成。

标签: javascript asynchronous rxjs


【解决方案1】:

正如您所怀疑的,在调用 onCompleted 之后,您无法更改 AsyncSubject 的值。处理简单的“根据需要调用服务器,缓存调用之间的值”场景的典型方法是使用 flatMapLatest 将您的触发器 observable 映射到表示您的服务器调用的 AsyncSubject。例如,如果你想每 30 秒刷新一次数据,你可以这样做:

const subscription = Rx.Observable
   .interval(30000)
   .flatMapLatest(() => serverCall())
   .subscribe(x => doStuffWithResult(x));

【讨论】:

  • 这就是我要找的。在我的例子中,我将使用主题触发器,而不是间隔,以确保仅在需要时才查询服务器。
【解决方案2】:

在你的情况下,我会考虑使用 Websockets,因为你想要 your observable (model) fresh at all times

类似

var source = Rx.Observable.create(function (observer) {
   websocket.onmessage = function( msg ) {
      observer.onNext( msg );
   }

   websocket.onerror = function( error ) {
      observer.onError( error );
   }

   websocket.onclose = function ( msg ) {
      observer.onComplete( msg );
   }
});

否则你可以使用间隔

const subscription = Rx.Observable
   .interval(1000)
   .flatMapLatest(() => Rx.Observable.fromPromise(fetch( ... ).then( response => response.json() ).retry(5))
   .subscribe( response => response );

如果出现错误,您始终可以使用retry 运算符不要立即放弃。

【讨论】:

  • 谢谢,你的第二个例子更好。我目前需要避免使用 websocket,因为 REST 接口已经可用。
猜你喜欢
  • 2016-06-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-08-14
相关资源
最近更新 更多