【问题标题】:Angular Observable need to complete firstAngular Observable 需要先完成
【发布时间】:2020-11-05 19:28:26
【问题描述】:
@Input() list: string[];

ngOnInit() : void {
   this.valueMap = new Map<any, any>();
   this.getDataFromService();
   this.buildContainer();
}

private getDataFromService(): void {
  this.list.forEach(value-> {
      this.fetchService(value).subscribe(data ->{
          this.valueMap.set(value,data);
          }
       )
   })
}

private buildContainer(): void {
   console.log(this.valueMap.size); // shows always 0 even when service returns the data 
 }

现在我必须在方法 buidlContainer() 中使用这个 valueMap , 因此我需要先从服务中获取完整的地图。当我在 buildConatainer() 中使用该地图时,它显示我未定义。

我知道这是一些异步调用问题。此外,我不能为 .subscribe() 中的每个值调用方法 buildContainer(),因为这不是更好的主意。 所以,我必须先计算地图,然后再进一步处理..

感谢任何帮助,我无法将服务从返回 Observable 修改为返回 Promise

我想做如下操作

private buildContainer(): void {
   for([key,data] of this.valueMap) {
      -----some code----
    }
 }

【问题讨论】:

    标签: angular promise async-await rxjs observable


    【解决方案1】:

    ngOnInit 中移除 buildContainer 并调用 subscribe

    中的方法
     private getDataFromService(): void {
        this.list.forEach((value, index) -> {
            this.fetchService(value).subscribe(data ->{
                this.valueMap.set(value,data);
                       }
             )
             if(this.list.length-1 == index){
              this.buildContainer(); // call here
             }
         })
      }
    

    【讨论】:

    • 我不能调用地图的每个值或服务中的每个数据...我需要调用该方法..只有当所有对服务的调用都完成时...意味着我已经准备好地图
    • @surendrakumar 由于 buildContainer 发生在订阅之外,它在异步操作完成之前执行。
    【解决方案2】:

    只需使用forkJoin 等待所有可观察对象完成。 forkJoin 将所有可观察对象捆绑为一个。这个 observable 可以返回并在 ngOnInit 中订阅它并调用 buildContainer

     @Input() list: string[];
    
      ngOnInit(): void {
        this.valueMap = new Map<any, any>();
        this.getDataFromService().subscribe(_ => this.buildContainer());
      }
    
      private getDataFromService(): Observable<any> {
        return forkJoin(
          this.list.map(value =>
            this.fetchService(value).pipe(
              tap(data => this.valueMap.set(value, data))
            )
          )
        );
      }
    
      private buildContainer(): void {
        console.log(this.valueMap.size);
      }
    

    如果您不在tap 运算符中填充valueMap,而是使用您在订阅函数中获得的结果,代码会更加简洁。

    【讨论】:

    • 这给了我错误You provided 'undefined' where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
    • 我的代码是private getDataFromService(): Observable&lt;any&gt; { return forkJoin( this.sampleMap.forEach((value,key) =&gt; { this.fetchService(value).pipe( tap(data =&gt; this.valueMap.set(key,data)) ) }) ); }
    • sampleMap 是具有 [key,value] 对的地图。我已将 this.list 更改为 this.sampleMap
    • @DeepakSharma 你必须使用map 而不是forEach 来返回一个可观察的数组而不是undefined。
    【解决方案3】:

    这是在代码中引入反应式范式的缺点之一(或优点,取决于你如何看待它)。单点响应式过程会逐渐部分或完全渗入代码的其他部分。

    因此,我建议您也将valueMap 设为响应式,并使其根据getDataFromService() 服务中的更改进行响应。一种方法是使 valueMap 变量成为 RxJS ReplaySubject 的缓冲区为 1。因此它会缓冲(或保持)最后一个推送给它的值并在订阅时立即发出它。

    试试下面的

    import { forkJoin, ReplaySubject, Subject } from 'rxjs';
    import { take, takeUntil } from 'rxjs/operators';
    
    valueMap: ReplaySubject<Map<any, any>> = ReplaySubject<Map<any, any>>(1);
    completed$ = new Subject<any>();
    
    @Input() list: string[];
    
    ngOnInit(): void {
      this.valueMap = new Map <any, any>();
      this.getDataFromService();
      this.buildContainer();
    }
    
    private getDataFromService(): void {
      forkJoin(this.list.map(value => this.fetchService(value))).pipe(
        takeUntil(this.completed$)             // <-- close subscription upon component `ngOnDestroy` hook trigger
      ).subscribe(
        data => {
          let map = new Map<any, any>();
          data.forEach((item, i) => {
            map.set(this.list[i], item);
          });
          this.valueMap.next(map);
        }
      );
    }
    
    private buildContainer(): void {
      this.valueMap.asObservable().pipe(
        take(1)               // <-- emit only the first notification and complete
      ).subscribe(
        data => {
          console.log(this.valueMap.size);
        }
      );
    }
    
    ngOnDestroy() {
      this.completed$.next();
      this.completed$.complete();
    }
    

    我还使用 RxJS 的 forkJoin() 函数来组合多个 observables 和 takeUntil 运算符在组件被销毁时关闭订阅。

    工作示例:Stackblitz

    在示例中,我使用jsonplaceholder API 在fetchService() 函数中模拟了一些HTTP 调用。

    注意:

    1. forkJoin() 只有在所有可观察对象都完成时才会发出。如果您正在处理数据流,则必须根据您的要求将其替换为 combineLatestzip

    2. 有一个明显的问题是,如果buildContainer() 函数被快速连续调用多次,每次都会触发单独的订阅。

    【讨论】:

    猜你喜欢
    • 2018-12-20
    • 1970-01-01
    • 1970-01-01
    • 2018-09-18
    • 2020-04-26
    • 2019-06-03
    • 2018-12-18
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多