【问题标题】:RxJS Observable forkJoin Not Executing in ParallelRxJS Observable forkJoin 未并行执行
【发布时间】:2021-03-25 08:03:57
【问题描述】:

我有以下代码,而且,在我的一生中,无法弄清楚为什么请求不会同时执行。我对 RxJS 和 observables 还是很陌生,因此对于改进以下代码的任何帮助也将不胜感激。基本上,我在后端调用 REST API 以获取一些数据。然后,对于该数据数组中的每个元素,我正在向不同的端点发出另一个请求(因此使用“forkJoin”运算符)。所有请求都是一次发送的,但它们似乎仍然是一个接一个地执行,而不是同时执行。

this.sites$.subscribe(data => {

    // data.forEach(element => {
    //     this.siteCaptureMap[element.id] = new CaptureData();
            
    //     this.sitesService.getCaptureData(element.nameOrNumber, element.owner.name).subscribe(data => {
    //         this.siteCaptureMap[element.id].count = data.length;
    //     });
    // });

    var obs: Observable<any>[] = [];
    for (var _i = 0; _i < data.length; _i++) {
        this.siteCaptureMap[data[_i].id] = new CaptureData();
        this.siteCaptureMap[data[_i].id].id = _i;
        obs.push(this.sitesService.getCaptureData(data[_i].nameOrNumber, data[_i].owner.name));
    }

    forkJoin(obs).subscribe(results => {
        for (var _i = 0; _i < results.length; _i++) {
            this.siteCaptureMap[data[_i].id].count = results[_i].length;
        }
    });


    this.dataSource.data = data;
    this.dataSource.filteredData = data;
});

再次,我们将不胜感激任何帮助。如果我需要澄清任何事情或提供任何其他代码 sn-ps,请告诉我!谢谢!

【问题讨论】:

    标签: javascript angular typescript rxjs rxjs-observables


    【解决方案1】:

    嵌套subscribes 可能会导致内存泄漏,并且很难取消订阅,因此无论何时嵌套subscribes,请考虑switchMapconcatMapmergeMap

    它们都有微小的变化,但它们会从之前的 observable 切换到新的 observable。这个post 解释了这些差异。

    对你来说,我会尝试做:

    import { switchMap } from 'rxjs/operators';
    
    ...
    this.sites$.pipe(
      switchMap(data => {
        let obs: Observable<any>[] = [];
        for (let _i = 0; _i < data.length; _i++) {
            this.siteCaptureMap[data[_i].id] = new CaptureData();
            this.siteCaptureMap[data[_i].id].id = _i;
            obs.push(this.sitesService.getCaptureData(data[_i].nameOrNumber, data[_i].owner.name));
        }
    
        return forkJoin(obs);
      }),
    ).subscribe(results => {
            for (let_i = 0; _i < results.length; _i++) {
                this.siteCaptureMap[data[_i].id].count = results[_i].length;
            }
    
            this.dataSource.data = data;
            this.dataSource.filteredData = data;
        });
    

    附注是使用letconst 而不是var

    另外,如果您看到所有请求同时发出,那么您就可以期待了。如果它连续返回,则可能是浏览器或服务器造成的。

    【讨论】:

      【解决方案2】:

      首先,我将重新排列代码,使其更符合 rxjs 的习惯,删除嵌套订阅并使用 pipe 并使其更具函数式风格。内联 cmets 尝试解释变化

      this.sites$.pipe(
        // this is the rxjs map operator that transform the data received from
        // the rest api into something different
        map(data => {
          // I create the obs array using the map method of JS arrays - this is 
          // NOT the rxjs map operator
          obs = data.map((element, _i) => {
             this.siteCaptureMap[element.id] = new CaptureData();
             this.siteCaptureMap[element.id].id = _i;
             return this.sitesService.getCaptureData(element.nameOrNumber, element.owner.name)
          });
          // return both the array of observables and the data
          return [data, obs];
        }),
        // concatMap makes sure that you wait for the completion of the rest api
        // call and the emission of the data fetched before you execute another
        // async operation via forkJoin
        concatMap(([data, obs]) => forkJoin(obs).pipe(
          // this is the rxjs map operator used to return not only the results
          // received as result of forkJoin, but also the data received with the
          // first rest call - I use this operator to avoid having to define 
          // a variable where to store 'data' and keep the logic stateless 
          map(results => ([data, results]))
        ))
      ).subscribe(([data, results]) => {
        // here I use forEach to loop through the results and perform
        // your logic
        results.forEach((res, _i) => this.siteCaptureMap[data[_i].id].count = res.length)
      })
      

      我觉得这个表单更符合rxjs及其功能精神,应该和你原来的表单是等价的。

      同时,我很想知道您为什么说您的代码在 forkJoin 内执行一次调用。

      顺便说一句,如果您有兴趣了解 rxjs 与 http 的常见使用模式,您可以read this article

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-02
        • 2022-01-20
        • 1970-01-01
        • 1970-01-01
        • 2018-12-06
        • 1970-01-01
        相关资源
        最近更新 更多