【问题标题】:Repeat request (Angular2 - http.get) n seconds after finished完成后 n 秒重复请求(Angular2 - http.get)
【发布时间】:2016-10-22 15:59:16
【问题描述】:

我玩了 angular2,过了一会儿就卡住了。

使用http.get 可以很好地处理单个请求,但我想每 4 秒轮询一次实时数据,经过一段时间的修改并阅读了很多我最终得到的响应式数据:

Observable.timer(0,4000)
  .flatMap(
    () => this._http.get(this._url)
       .share()
       .map(this.extractData)
       .catch(this.handleError)
  )
  .share(); 

http.get-observable 发出请求结果后,是否有一种简单 的方法来启动(4 秒)间隔? (或者我最终会进入 observable-hell?)

我想要的时间线:

Time(s): 0 - - - - - 1 - - - - - 2 - - - - - 3 - - - - - 4 - - - - - 5 - - - - - 6
Action:  Request - - Response - - - - - - - - - - - - - - - - - - - -Request-... 
Wait:                | wait for 4 seconds -------------------------> |

【问题讨论】:

  • observable-hell 是什么意思?
  • 观察 observable 再次触发它。不知何故?
  • 这是您的确切要求,所以我认为这种方法没有问题。
  • 昨天有人问了一个类似的问题:stackoverflow.com/a/37931742/1961059
  • 这也不会在请求完成后等待n秒,它只会延迟一个请求,如果前一个请求的时间比间隔时间长。

标签: javascript angular rxjs observable reactivex


【解决方案1】:

更新到 RxJS 6

import { timer } from 'rxjs';
import { concatMap, map, expand, catchError } from 'rxjs/operators';

pollData$ = this._http.get(this._url)
  .pipe(
    map(this.extractData),
    catchError(this.handleError)
  );

pollData$.pipe(
  expand(_ => timer(4000).pipe(concatMap(_ => pollData$)))
).subscribe();

我正在使用 RxJS 5,但我不确定 RxJS 4 的等效运算符是什么。无论如何,这是我的 RxJS 5 解决方案,希望对您有所帮助:

var pollData = this._http.get(this._url)
            .map(this.extractData)
            .catch(this.handleError);
pollData.expand(
  () => Observable.timer(4000).concatMap(() => pollData)
).subscribe();

expand 操作符将发射数据并在每次发射时递归地启动一个新的 Observable

【讨论】:

  • 好吧,这个比我的好! :) expand 上的文档非常模糊。 :(
  • 仅供参考:递归不会遇到Maximum call stack size exceeded. 错误。使用 >100k 网络调用对其进行了测试。速度仍然很快,并没有耗尽所有 RAM。
  • @Alex 是否因 http 错误而停止轮询取决于您的 this.handleError 函数。如果您使用throwError 发出错误,轮询将终止,但如果您使用of 等发出错误的默认值,则轮询将继续。
  • 你有大理石测试这个场景的例子吗?我尝试了过去 3 天的测试,但没有任何运气。
【解决方案2】:

我自己设法做到了,唯一的缺点是 http.get 不能更容易地重复。

pollData(): Observable<any> {

  //Creating a subject
  var pollSubject = new Subject<any>();

  //Define the Function which subscribes our pollSubject to a new http.get observable (see _pollLiveData() below)
  var subscribeToNewRequestObservable = () => {
    this._pollLiveData()
      .subscribe(
      (res) => { pollSubject.next(res) }
      );
  };

  //Subscribe our "subscription-function" to custom subject (observable) with 4000ms of delay added
  pollSubject.delay(4000).subscribe(subscribeToNewRequestObservable);

  //Call the "subscription-function" to execute the first request
  subscribeToNewRequestObservable();

  //Return observable of our subject
  return pollSubject.asObservable();

}

private _pollLiveData() {

  var url = 'http://localhost:4711/poll/';

  return this._http.get(url)
    .map(
    (res) => { return res.json(); }
    );
};

这就是为什么您不能使用更直接的订阅的原因:

var subscribeToNewRequestObservable = () => {
    this._pollLiveData()
      .subscribe(pollSubject);
  };

http.get-observable 的补全也会补全你的主题并阻止它发出更多的项目。


这仍然是一个 cold observable,因此除非您订阅它,否则不会发出任何请求。

this._pollService.pollData().subscribe(
  (res) => { this.count = res.count; }
);

【讨论】:

    【解决方案3】:

    Can Nguyen 对 answer 进行了小幅修改,以防您希望轮询延迟取决于之前的请求完成状态。

    var pollData = () => request()   // make request
        .do(handler, errorHandler)   // handle response data or error
        .ignoreElements()            // ignore request progress notifications
        .materialize();              // wrap error/complete notif-ns into Notification
    
    pollData()                            // get our Observable<Notification>...
      .expand(                            // ...and recursively map...
        (n) => Rx.Observable              // ...each Notification object...
          .timer(n.error ? 1000 : 5000)   // ...(with delay depending on previous completion status)...
          .concatMap(() => pollData()))   // ...to new Observable<Notification>
      .subscribe();
    

    Plunk.

    或者:

    var pollData = () => request()             // make request
        .last()                                // take last progress value
        .catch(() => Rx.Observable.of(null));  // replace error with null-value
    
    pollData()
      .expand(
        (data) => Rx.Observable
          .timer(data ? 5000 : 1000)           // delay depends on a value
          .concatMap(() => pollData()))
      .subscribe((d) => {console.log(d);});    // can subscribe to the value stream at the end
    

    Plunk.

    【讨论】:

      【解决方案4】:

      如果更方便,您可以尝试使用间隔。调用 subscribe 会为您提供 Subscription,让您可以在一段时间后取消轮询。

      let observer = Observable.interval(1000 * 4);
      let subscription = observer.subsscribe(x => {
          this._http.get(this._url)
           .share()
           .map(this.extractData)
           .catch(this.handleError)
      });
      
      ....
      // if you don't require to poll anymore..
      subscription.unsubscribe();
      

      【讨论】:

      • interval 将在 4 秒后启动,而不是像第一次请求的计时器然后等待。从http.get-request 完成时算起。
      猜你喜欢
      • 2016-07-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-12-13
      • 2018-12-05
      相关资源
      最近更新 更多