【问题标题】:Resetting ReplaySubject in RxJS 6在 RxJS 6 中重置 ReplaySubject
【发布时间】:2018-07-03 01:56:12
【问题描述】:

我有一个可过滤的“活动日志”,目前使用 ReplaySubject 实现(因为有几个组件使用它,而且它们可能在不同的时间订阅)。

当用户更改过滤器设置时,会发出新请求,但结果会附加到 ReplaySubject 而不是替换它。

我想知道是否有办法更新ReplaySubject 以仅使用switchMap 之类的方式发送新项目?

否则,我可能需要使用 BehaviorSubject 来返回所有活动条目的数组,或者重新创建 ReplaySubject 并通知用户(可能通过使用另一个 observable)取消订阅并重新订阅新的 observable。

【问题讨论】:

    标签: javascript rxjs


    【解决方案1】:

    如果您希望能够在不让其订阅者明确取消订阅和重新订阅的情况下重置主题,您可以执行以下操作:

    import { Observable, Subject } from "rxjs";
    import { startWith, switchMap } from "rxjs/operators";
    
    function resettable<T>(factory: () => Subject<T>): {
      observable: Observable<T>,
      reset(): void,
      subject: Subject<T>
    } {
      const resetter = new Subject<any>();
      const source = new Subject<T>();
      let destination = factory();
      let subscription = source.subscribe(destination);
      return {
        observable: resetter.asObservable().pipe(
          startWith(null),
          switchMap(() => destination)
        ),
        reset: () => {
          subscription.unsubscribe();
          destination = factory();
          subscription = source.subscribe(destination);
          resetter.next();
        },
        subject: source
      };
    }
    

    resettable 将返回一个对象,其中包含:

    • 一个observable,可重新设置的主题的订阅者应该订阅它;
    • subject,您可以在其上调用nexterrorcomplete;和
    • reset 函数将重置(内部)主题。

    你会这样使用它:

    import { ReplaySubject } from "rxjs";
    const { observable, reset, subject } = resettable(() => new ReplaySubject(3));
    observable.subscribe(value => console.log(`a${value}`)); // a1, a2, a3, a4, a5, a6
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
    observable.subscribe(value => console.log(`b${value}`)); // b2, b3, b4, b5, b6
    reset();
    observable.subscribe(value => console.log(`c${value}`)); // c5, c6
    subject.next(5);
    subject.next(6);
    

    【讨论】:

      【解决方案2】:

      这是一个使用之前发布的可重置工厂的类,因此您可以使用 const myReplaySubject = new ResettableReplaySubject&lt;myType&gt;()

      import { ReplaySubject, Subject, Observable, SchedulerLike } from "rxjs";
      import { startWith, switchMap } from "rxjs/operators";
      
      export class ResettableReplaySubject<T> extends ReplaySubject<T> {
      
      reset: () => void;
      
      constructor(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
          super(bufferSize, windowTime, scheduler);
          const resetable = this.resettable(() => new ReplaySubject<T>(bufferSize, windowTime, scheduler));
      
          Object.keys(resetable.subject).forEach(key => {
              this[key] = resetable.subject[key];
          })
      
          Object.keys(resetable.observable).forEach(key => {
              this[key] = resetable.observable[key];
          })
      
          this.reset = resetable.reset;
      }
      
      
      private resettable<T>(factory: () => Subject<T>): {
          observable: Observable<T>,
          reset(): void,
          subject: Subject<T>,
      } {
          const resetter = new Subject<any>();
          const source = new Subject<T>();
          let destination = factory();
          let subscription = source.subscribe(destination);
          return {
              observable: resetter.asObservable().pipe(
                  startWith(null),
                  switchMap(() => destination)
              ) as Observable<T>,
              reset: () => {
                  subscription.unsubscribe();
                  destination = factory();
                  subscription = source.subscribe(destination);
                  resetter.next();
              },
              subject: source,
          };
      }
      }
      

      【讨论】:

        【解决方案3】:

        如果您可以利用缓冲区使用来自原始源的数据这一事实,并且缓冲数据的订阅者可以在接收到所有旧值后切换到原始源,则问题会变得更容易。

        例如。

        let data$ = new Subject<any>() // Data source
        
        let buffer$ = new ReplaySubject<any>() 
        let bs = data$.subscribe(buffer$)  // Buffer subscribes to data
        
        // Observable that returns values until nearest reset
        let getRepeater = () => {
           return concat(buffer$.pipe(
              takeUntil(data$), // Switch from buffer to original source when data comes in
            ), data$)
        }
        

        要清除,请更换缓冲区

        // Begin Buffer Clear Sequence
        bs.unsubscribe()
        buffer$.complete()
        
        buffer$ = new ReplaySubject()
        bs = data$.subscribe(buffer$)
        buffObs.next(buffer$)
        

        为了使代码更实用,您可以将函数 getRepeater() 替换为反映最新参考的主题

        let buffObs = new ReplaySubject<ReplaySubject<any>>(1)
        buffObs.next(buffer$)        
        
        let repeater$ = concat(buffObs.pipe(
           takeUntil(data$),
           switchMap((e) => e),                    
        ), data$)
        

        以下

            let data$ = new Subject<any>()
        
            let buffer$ = new ReplaySubject<any>()
            let bs = data$.subscribe(buffer$)         
        
            let buffObs = new ReplaySubject<ReplaySubject<any>>(1)
            buffObs.next(buffer$)        
        
            let repeater$ = concat(buffObs.pipe(
              takeUntil(data$),
              switchMap((e) => e),                    
            ), data$)
        
            // Begin Test
        
            data$.next(1)
            data$.next(2)
            data$.next(3)
        
            console.log('rep1 sub')
            let r1 = repeater$.subscribe((e) => {          
              console.log('rep1 ' + e)
            })
        
            // Begin Buffer Clear Sequence
            bs.unsubscribe()
            buffer$.complete()
        
            buffer$ = new ReplaySubject()
            bs = data$.subscribe(buffer$)
            buffObs.next(buffer$)
            // End Buffer Clear Sequence
        
            console.log('rep2 sub')
            let r2 = repeater$.subscribe((e) => {
              console.log('rep2 ' + e)
            })
        
            data$.next(4)
            data$.next(5)
            data$.next(6)
        
            r1.unsubscribe()
            r2.unsubscribe()
        
            data$.next(7)
            data$.next(8)
            data$.next(9)        
        
            console.log('rep3 sub')
            let r3 = repeater$.subscribe((e) => {
              console.log('rep3 ' + e)
            })
        

        输出

        rep1 子

        rep1 1

        rep1 2

        rep1 3

        rep2 子

        rep1 4

        rep2 4

        rep1 5

        rep2 5

        rep1 6

        rep2 6

        rep3 子

        rep3 4

        rep3 5

        rep3 6

        rep3 7

        rep3 8

        rep3 9

        【讨论】:

          【解决方案4】:

          我遇到了同样的问题:我的一个组件订阅了共享服务的 ReplaySubject。一旦导航离开并返回仍然传递给组件的以前的值。 仅仅完成主题是不够的。

          出于这个目的,上述解决方案似乎很复杂,但我发现了另一个真正简单的解决方案,只需完成主题并在共享服务中分配一个新创建的解决方案,如下所示:

          constructor() {
              this.selectedFeatures = new ReplaySubject()
              this.selectedFeaturesObservable$ = this.selectedFeatures.asObservable()
          }
          
          completeSelectedFeatures() {
              this.selectedFeatures.complete()
              this.selectedFeatures = new ReplaySubject()
              this.selectedFeaturesObservable$ = this.selectedFeatures.asObservable()
          
          }
          

          我还打印了共享服务的构造函数以显示我使用的类型。 这样,每当我离开我的组件时,我只需在我的共享服务上调用该方法,因此每当我导航回消耗可观察的共享服务的组件时,都会获得一个新的新鲜且空的 ReplaySubject。 我在 ngOnDestroy Angular 生命周期钩子中调用该方法:

          ngOnDestroy() {
              console.log('unsubscribe')
              this.featureSub.unsubscribe()
              this.sharedDataService.completeSelectedFeatures()
          }
          

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2021-04-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2019-05-07
            • 1970-01-01
            • 2019-02-08
            相关资源
            最近更新 更多