【问题标题】:Rxjs observable wait until some condition is metRxjs 可观察等待,直到满足某些条件
【发布时间】:2018-01-27 00:42:20
【问题描述】:

我有以下重试逻辑来重试操作。它适用于单个请求。对于多个正在进行的请求,我想在重试之前等待现有的重试逻辑完成。

handleError(errors: Observable<any>) {

    const retryCountStart: number = 1;

    // wait if there is any existing operation retrying
    // once it is complete, continue here

    return errors
        .mergeScan<any, any>(
        (retryCount: any, err: any) => {

            if (retryCount <= 5) {
                return Observable.of(retryCount + 1);
            } 

        },retryCountStart)
        .delay(1000);
}

如何在上述方法中添加延迟直到满足某些条件?

【问题讨论】:

    标签: javascript angular typescript rxjs observable


    【解决方案1】:

    您可以使用 async / await 来实现这个目的,并通过 Promise 解析:

    async handleError(errors: Observable<any>) {
    
        const retryCountStart: number = 1;
    
        // wait if there is any existing operation retrying
        // ----------------------------------------------------------
        await new Promise(resolve => {
            // declare some global variable to check in while loop
            while(this.retrying){
                setTimeout(()=> {
                    // Just adding some delay 
                    // (you can remove this setTimeout block if you want)
                },50);
            }
    
            // when while-loop breaks, resolve the promise to continue
            resolve();
        });
        // ----------------------------------------------------------
    
        // once it is complete, continue here
    
        return errors
            .mergeScan<any, any>(
            (retryCount: any, err: any) => {
    
                if (retryCount <= 5) {
                    return Observable.of(retryCount + 1);
                } 
    
            },retryCountStart)
            .delay(1000);
    }
    

    【讨论】:

    • 感谢您的快速回复。如果我使用 async/await,那么我认为我将不得不使用异步更新我的所有调用者函数。没有 async/await 是否可以实现?
    • 不,您必须在此之前使用async,我在答案中以及handleError 方法之前添加了这一点。
    • 如果有任何其他解决方案,那么我不知道。我在我的项目中使用这个没有任何错误:)
    【解决方案2】:

    据我了解,您只想在上一个流完成后才开始下一个流(即将流添加到队列)

    import { Observable, of, BehaviorSubject, from } from 'rxjs';
    import { tap, finalize, filter, take, switchMap, delay } from 'rxjs/operators';
    
    class StreamQueue {
      lastStreamCompleted$: Observable<boolean> = new BehaviorSubject(true);
    
      private runAfter<T>(lastStreamCompleted$: Observable<boolean>, stream$: Observable<T>): [Observable<boolean>, Observable<T>] {
        const newLastStreamCompleted$ = new BehaviorSubject(false);
        const newStream$ = lastStreamCompleted$
          .pipe(
            filter(lastStreamCompleted => lastStreamCompleted),
            take(1),
            switchMap(() => stream$),
            finalize(() => newLastStreamCompleted$.next(true)),
        );
        return [newLastStreamCompleted$, newStream$];
      }
    
      add(stream$: Observable<any>) {
        const [newLastStreamCompleted$, newStream$] = this.runAfter(this.lastStreamCompleted$, stream$);
        this.lastStreamCompleted$ = newLastStreamCompleted$;
        return newStream$;
      }
    }
    
    const streamQueue = new StreamQueue();
    
    streamQueue.add(from([1, 2]).pipe(delay(100))).subscribe(console.log);
    setTimeout(()=>streamQueue.add(from([21, 22]).pipe(delay(100))).subscribe(console.log), 100);
    streamQueue.add(from([11, 12]).pipe(delay(100))).subscribe(console.log);
    
    // Output:
    // 1
    // 2
    // 11
    // 12
    // 21
    // 22
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-24
      • 1970-01-01
      • 2017-06-12
      • 2018-05-10
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多