【问题标题】:Notify when all observables are completed当所有可观察对象完成时通知
【发布时间】:2017-07-31 21:48:27
【问题描述】:

我有一个可观察的数组。每个 observable 都是一个需要通过 HTTP 请求上传的文件。

我想做两件事:

  1. 对于每个上传的文件,我必须发出将在父组件中处理的事件。
  2. 上传所有文件后,我会返回通知所有可观察对象已完成。

我从这段代码开始:

this.isUploading$ = Observable.of(true);

const source$ = Observable.from(files)
    .concatMap(file => this.uploadSingleFile(file));

source$.subscribe(data => this.onSuccess.emit(data));

可观察到的isUploading$ 表示上传过程已启动并在 HTML 中使用。我想根据isUploading$ 的状态显示微调器。

Observable source$ 表示文件上传操作,因为 this.uploadSingleFile(file) 方法返回 Observable<Response>

此代码将上传所有文件并在每个完成的 observable 上执行 this.onSuccess.emit(data)

问题是:当一切都完成后,如何将this.isUploading$ 设置为false

更新:

我想实现这样的目标,但没有在 onCompleted 函数中分配变量。示例:

source$.subscribe(data => this.onSuccess.emit(data),
                  err => console.log(err),
                  () => this.isUploading = false );

我想检索两个 observables 并在以后随时订阅它们。

【问题讨论】:

    标签: angular typescript rxjs


    【解决方案1】:

    如果你想同时运行所有这些 observables,concatMap 不会做你想做的事。您可以使用 doforkJoin 运算符来完成这两件事。

    forkJoin 基本上是 Promise.all 用于 observables。

    do 是一种利用正在传递和可观察的值的方法,因此您可以对它们“做”一些事情(副作用)。

    所以:

    const arrayOfObservables = [observable1, observable2, observable3];
    
    Observable.forkJoin(
      arrayOfObservables.map((obs, i) => obs.do({
        next(value) { console.log(`Observable ${i} emits: ${value}`); },
        complete() { console.log(`Observable ${i} is complete`); }
      }))
    )
    .subscribe(values => console.log('everything done with', values))
    

    【讨论】:

      【解决方案2】:

      为什么需要 isUploading 成为可观察对象?我会这样做:

      this.isUploading = true;
      const source$ = Observable.from(files)
          .concatMap(file => this.uploadSingleFile(file));
      
      source$.subscribe(data => this.onSuccess.emit(data),
                        err => console.log(err),
                        () => this.isUploading = false );
      

      【讨论】:

      • 我不想在 observables 中改变 this.isUploading 的状态。如果可能的话,我想转换成可观察的。除了在onCompleted 函数中分配 false 之外,还有其他方法吗?
      • 另外,在使用OnPushchangedetection-strategy 时,这会给您带来一些麻烦。
      【解决方案3】:

      您可以简单地使用BehaviorSubject

      public isUploading$ = new BehaviorSubject<boolean>(true);
      

      ...

      this.isUploading$.next(false);
      this.isUploading$.complete();
      

      并使用forkJoin操作符等待全部完成:

      const allSources$ = Observable.forkJoin(source$);
      allSources$.subscribe(data => {
          this.isUploading$.next(false);
          this.isUploading$.complete();
      }
      

      这会给你想要的结果。

      【讨论】:

      • 我必须在哪里打电话给this.isUploading$.next(true);?我怎么知道所有文件都已上传?
      • 在您拨打this.onSuccess.emit的同时拨打。
      • this.onSuccess.emit 为每个 observable 执行。我只想在所有操作完成后设置false
      • 啊,好吧,误解了你的例子,使用forkJoin 操作符来等待所有的 Observables 完成。 (reactivex.io/rxjs/class/es6/…)
      • 已将其添加到我的答案中。
      猜你喜欢
      • 2021-05-01
      • 1970-01-01
      • 2019-04-23
      • 1970-01-01
      • 1970-01-01
      • 2022-10-06
      • 2019-03-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多