【问题标题】:RxJS forkJoin piped observablesRxJS forkJoin 管道可观察对象
【发布时间】:2020-06-09 01:11:54
【问题描述】:

我需要对服务器进行多次调用以保存一些数据,并且每次后续调用都需要从前一次调用的结果中获取一些数据。尝试使用 forkJoin,但事件顺序没有意义(至少对我而言)。我认为问题出在 .pipe() 调用上,我正在尝试修改下一次调用的输入数据。

所以我有两个问题:

  1. 为什么输出不符合我的预期
  2. 有没有办法使用 forkJoin 来完成这项工作(我知道有十几种其他方法可以解决这个问题,所以并不是真的在寻找替代解决方案)

这是一些示例代码,或StackBlitz


  let data: { [key: string]: any } = {};

  forkJoin(
      this.saveFirst(data).pipe(
        tap(_ => console.log('saveFirst pipe after')),
        tap(result => data.id = result.id)
      ),
      this.saveSecond(data).pipe(
        tap(_ => console.log('saveSecond pipe after')),
        tap(result => data.name = result.name)
      ),
  ).subscribe(result => console.log('done: data - ', JSON.stringify(data)));

...

  private saveFirst(data: { [key: string]: any }): Observable<any> {
      console.log('saveFirst: start');
      console.log('saveFirst: data - ', JSON.stringify(data));

      // replaced call to server with "of({ id: 1 })" for simplicity of example
      return of({ id: 1 }).pipe(tap(_ => console.log('saveFirst: end')));
  }

  private saveSecond(data: { [key: string]: any }): Observable<any> {
      console.log('saveSecond: start');
      console.log('saveSecond: data - ', JSON.stringify(data));

      // replaced call to server with "of({ name: 'test' })" for simplicity of example
      return of({ name: 'test' }).pipe(tap(_ => console.log('saveSecond: end')));;
  }

我期待以下输出:

saveFirst: start
saveFirst: data -  {}
saveFirst: end
saveFirst pipe after

saveSecond: start
saveSecond: data - {}
saveSecond: end
saveSecond pipe after

done: data -  {"id":1,"name":"test"}

但是得到了这个:

saveFirst: start
saveFirst: data -  {}
saveSecond: start
saveSecond: data -  {}

saveFirst: end
saveFirst pipe after
saveSecond: end
saveSecond pipe after

done: data -  {"id":1,"name":"test"}

【问题讨论】:

  • 你试过用concat替换forkJoin吗?它应该给出预期的行为。
  • 这不能用 forkJoin 完成。 forkJoin 并行执行。您正在寻找顺序执行。

标签: angular typescript rxjs


【解决方案1】:

在这种情况下,您需要使用mergeMap/switchMap

this.saveFirst(data).pipe(
          tap(_ => this.actions.push('saveFirst pipe after')),
          tap(result => data.id = result.id),
          switchMap((res)=>{
            return this.saveSecond(data).pipe(
          tap(_ => this.actions.push('saveSecond pipe after')),
          tap(result => data.name = result.name)
        );
          })).subscribe(result => this.actions.push('done: data - ' + JSON.stringify(data)));

上面的代码会产生你需要的结果。 forkJoin 用于我们想要发出多个请求并且只关心最终结果的情况。

分叉Stackblitz

【讨论】:

    【解决方案2】:

    如果执行顺序很重要,则需要使用 concatMap 而不是 forkjoin

    this.saveFirst(data).concatMap(() => this.saveSecond(data)).subscribe()
    

    https://www.learnrxjs.io/learn-rxjs/operators/transformation/concatmap 其中提到在顺序很重要时使用 concatMap。

    【讨论】:

      【解决方案3】:

      我一直在为同样的问题苦苦挣扎,我找到的唯一解决方案是不通过管道传输 observables,而是在订阅 forjoin 后处理所有响应。

      它将并行运行两个任务,而不是像上述解决方案那样依次运行。 我将您的处理从“点击”转移到新的处理方法,并在 forkJoin 订阅完成后调用它们。

      yourMethod()
      {
          let data: { [key: string]: any } = {};
      
          forkJoin(
              this.saveFirst(data),
              this.saveSecond(data)
          ).subscribe(([res1, res2]) =>
          {
              this.handleSuccess_1(res1, data);
              this.handleSuccess_2(res2, data);
              console.log('done: data - ', JSON.stringify(data));
          });
      }
      
      handleSuccess_1(res, data)
      {
          console.log('saveFirst pipe after');
          data.id = res.id;
      }
      
      handleSuccess_2(res, data)
      {
          console.log('saveSecond pipe after');
          data.name = res.name;
      }
      

      【讨论】:

        猜你喜欢
        • 2016-10-29
        • 2019-10-31
        • 1970-01-01
        • 2021-11-05
        • 2016-06-18
        • 1970-01-01
        • 2022-11-25
        • 2021-07-10
        • 1970-01-01
        相关资源
        最近更新 更多