【问题标题】:How do I chain Angular observables?如何链接 Angular 可观察对象?
【发布时间】:2021-08-26 13:10:47
【问题描述】:

我的组件需要在发出 API 请求之前检查是否设置了某些应用首选项。现在我已经这样设置了,我在 2 分钟的计时器上更新我的组件数据:

ngOnInit(): void {
    this.subscription = timer(0, 120 * 1000).subscribe(() => {
        this.shopService.getPreferencesAsObservable().subscribe(preferences => {
            if(preferences) {
                this.getInitialPendingSlotsOrders();
                this.getInitialPendingNoSlotsOrders();
            }
        });
    });
}

getInitialPendingSlotsOrders(){
    this.apiService.fetchShopOrders("status=PENDING&only_slots=true").subscribe(orders=> {
        /* do stuff with orders */
        /* it can happen that I need to get another page of data */
        if(orders.next_page){
            this.getNextSlotsSetOfPendingOrders();
        }
    });
}

getInitialPendingNoSlotsOrders(){
    this.apiService.fetchShopOrders("status=PENDING").subscribe(orders=> {
        /* do stuff with orders */
        /* it can happen that I need to get another page of data */
        if(orders.next_page){
            this.getNextNoSlotsSetOfPendingOrders();
        }
    });
}

getNextSlotsSetOfPendingOrders() { 
    this.apiService.fetchNextSetOfOrders(this.nextSlotsUrl).subscribe(nextSetOfOrders => {
        /* do stuff with next set of orders */
    })
}

getNextNoSlotsSetOfPendingOrders() { 
    this.apiService.fetchNextSetOfOrders(this.nextNoSlotsUrl).subscribe(nextSetOfOrders => {
        /* do stuff with next set of orders */
    })
}

我认为这会起作用,但我遇到了一种情况,我看到正在进行一些额外的 API 调用。我知道这与链接可观察对象有关。我可以做些什么来清理它?

提前谢谢你。

【问题讨论】:

    标签: angular typescript rxjs


    【解决方案1】:

    您有多个嵌套订阅。它们导致多个可能永远不会关闭的开放订阅。相反,您需要使用各种可用的 RxJS 运算符将其限制为单个订阅。

    看到你需要并行触发两个请求,你也可以使用 RxJS forkJoin 函数。

    请参阅here 以快速了解情况。

    总之

    • switchMap 运算符从一个可观察对象映射到另一个对象
    • filter 运算符根据条件继续运算符链
    • forkJoin 并行组合和触发多个可观察对象

    试试下面的

    ngOnInit(): void {
      this.subscription = timer(0, 120 * 1000).pipe(
        switchMap(() => this.shopService.getPreferencesAsObservable()),
        filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
        switchMap(() => 
          forkJoin({
            slots: getInitialPendingOrders(true),
            noSlots: getInitialPendingOrders(false)
          })
        )
      ).subscribe({
        next: ({ slots, noSlots }) => {
          // do stuff with orders from `slots` and `noSlots` responses
        },
        error: (error: any) => {
          // handle error
        }
      });
    }
    
    getInitialPendingOrders(slots: boolean): Observable<any> {
      return this.apiService.fetchShopOrders("status=PENDING" + slots ? "&only_slots=true" : '');
    }
    

    更新

    根据经验,您应该返回 observable 并仅在需要响应的地方订阅。在您的情况下,您可以将switchMap 传递给forkJoin 的每个参数,并有条件地返回一个可观察对象。当你不想返回任何东西时,返回 RxJS 常量 EMPTY 以从 forkJoin 发出结果。请注意,forkJoin 只会在它的所有源 observables 完成时才会发出。

    ngOnInit(): void {
      this.subscription = timer(0, 120 * 1000).pipe(
        switchMap(() => this.shopService.getPreferencesAsObservable()),
        filter(preferences => !!preferences) // emit only if `preferences` is defined and truthy
        switchMap(() => 
          forkJoin({
            slots: getInitialPendingOrders(true).pipe(
              switchMap((orders: any) => {
                /* do stuff with orders */
                return orders.next_page ? this.getNextSlotsSetOfPendingOrders() : EMPTY;
              })
            ),
            noSlots: getInitialPendingOrders(false).pipe(
              switchMap((orders: any) => {
                /* do stuff with orders */
                return orders.next_page ? this.getNextNoSlotsSetOfPendingOrders() : EMPTY;
              })
            )
          })
        )
      ).subscribe({
        next: ({ slots, noSlots }) => {
          // do stuff with next set of orders from `slots` and `noSlots`
        },
        error: (error: any) => {
          // handle error
        }
      });
    }
    
    getInitialPendingOrders(slots: boolean): Observable<any> {
      return this.apiService.fetchShopOrders("status=PENDING" + !!slots ? "&only_slots=true" : '');
    }
    
    getNextSlotsSetOfPendingOrders(): Observable<any> { 
      return this.apiService.fetchNextSetOfOrders(this.nextSlotsUrl);
    }
    
    getNextNoSlotsSetOfPendingOrders(): Observable<any> { 
      return this.apiService.fetchNextSetOfOrders(this.nextNoSlotsUrl);
    }
    

    【讨论】:

    • 这似乎正是我想要的!但是,我忘了添加我有另一个可能的订阅层(现在更新)的问题......你能看看并更新你的答案吗?谢谢!
    • @ManuelBrás:一切都正确调用了吗?函数getNextSlotsSetOfPendingOrders() 似乎从未被调用过。
    • 我的错!立即更新问题。
    • @ManuelBrás:是的,您可以使用tap 运算符,如tap(preferences =&gt; this.someField = preferences),filter 运算符之前或之后取决于您的要求。如果您需要所有首选项,则可以在过滤器之前。如果您只需要 preferences 传递一个条件,则可以在 filter 运算符之后。
    • @ManuelBrás:是的,如果您需要在getNextSlotsSetOfPendingOrders()(和它的对应项)中触发另一个可观察对象,您可以执行getNextSlotsSetOfPendingOrders().pipe(switchMap(orders =&gt; /* return observable */)) 等等。
    【解决方案2】:

    这是一个非常开放的问题,因为有很多方法可以链接 observables。 Combination operators on Learn RxJs 是查找所有不同选项的好地方。

    在使用 rxjs 一段时间后,我开始养成将复杂进程分离为 const 变量或只读属性的习惯,然后有一个单独的 observable 将它们链接在一起。我发现随着班级的增长,它使事情更容易维护。另外,很多时候一个方法是不必要的,因为它只是设置相同的流 - 只在订阅时执行。

    在下面的示例中,在每次计时器执行后,流将流切换到 getPreferencesAsObservable() 方法,该方法可能会返回一个发射一次的 observable。发射后,filer() 仅在返回有效首选项时使用。最后,forkJoin() 用于合并两个获取车间订单的 observable 的最后结果。

    有一次我选择使用concatMap,而在另一个switchMap。选择前者是为了在一个发射为真而下一个发射为假的情况下不会跳过任何发射。使用 concatMap 会执行以下 observables,但是使用 switchMap 如果之前的发射没有完成,那么它将被终止。真正的选择取决于你想要的行为。

    readonly noSlotsProcess$ = 
      this.apiService.fetchShopOrders("status=PENDING").pipe(
        tap((x) => /* do stuff */)
      );
    readonly slotsProcess$ = 
      this.apiService.fetchShopOrders("status=PENDING&only_slots=true").pipe(
        tap((x) => /* do stuff */)
      );
    readonly refresh$ = timer(0, 120 * 1000).pipe(
      concatMap(() => this.shopService.getPreferencesAsObservable()),
      filter((preferences => !!preferences),
      switchMap(() => { forkJoin( noSlots: this.noSlotsProcess$, slots: this.slotsProcess$ }))
    );
    ngOnInit(): void {
        this.subscription = this.refresh$.subscribe();
    }
    
    

    【讨论】:

    • 尽管@MichaelD 的回答对我有用,但这个解决方案非常干净。我当然也可以从中得到一些东西,所以感谢您抽出宝贵的时间!
    【解决方案3】:

    根据经验,永远不要在subscribe 中调用subscribe(当然也有例外)。

    我建议您阅读Is it good way to call subscribe inside subscribe? 并查看不同的运算符(forkJoinmergeMap、...),RxJs 提供了按您的要求按顺序、并行组合 observables。

    【讨论】:

    • 谢谢!我会看看。作为一名 Angular 业余爱好者,我一直在努力开始使用 RxJs。您是否有任何针对 RxJs 的教程或课程的参考资料,以便我深入学习?
    猜你喜欢
    • 2018-10-25
    • 1970-01-01
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 2021-01-03
    • 2019-02-18
    • 2020-12-25
    • 2021-12-30
    相关资源
    最近更新 更多