【问题标题】:RxJs Skip Sample on first Emit, wait for one Observable to completeRxJs 在第一次 Emit 时跳过示例,等待一个 Observable 完成
【发布时间】:2018-09-13 16:01:46
【问题描述】:

问题 1

 combineLatest(this.layerService.layersData$, this.displayService.displayData$, this.dataSource.data$,
      (layer, display, data) => ({ layer, display, data }))
      .pipe(
        skipWhile(({ layer, display, data }) =>
          _.isEmpty(layer) || _.isEmpty(display) || _.isEmpty(data)),
        takeWhile(() => this.cacheService.isDirty()),
        sample(interval(2000)),
        map(result => {
          const layerFiltered = result.layer.filter(ly => result.display.findIndex(d => d.id === ly.id) !== -1);
          return { ...result, layer: layerFiltered };
        })
  )
  .subscribe(result => {
    console.log(result);
  });

我想避免在第一次发射时采样,然后再使用采样。

第一次发出我的意思是,它能够进入地图功能。 不使用外部局部变量可以实现吗?


问题 2

ngOnInit() {
   this.displayService.displayData$.delay(500).take(1).subscribe(data =>  this.displayMeta = data);

   this.layerService.getLayerData()
     .subscribe(layers => {
       this.layers = layers;
     });
}

我希望 layerService 订阅等到 displayService 完成,我可以将 layerService 订阅逻辑放在 displayService subscribe 方法中,但这似乎不是解决问题的好方法。

我希望 this.displayService....... 代码是同步的。 我这也需要一次,而不是 take(1) 运算符。


问题 3

dirty = {};
fetchedData = {};
reportData$ = new BehaviorSubject({});

constructor(private dataSourceService: DataSourceService, private someService: SomeService) {
  const dataFetch$ = this.dataSourceService.data$
    .pipe(
      tap(dList => {
        // update dirty by comparing dList, if this.dirty has 3 keys and dList have two item then this.dirty length will be two
        this.dirty = dList.reduce((acc, et) => ({ ...acc, [et.id]: _.get(this.dirty, et.id, true) }), {});
      }),
      filter(dList => !_.isEmpty(dList)),
      map(dList => _.filter(dList, dL => this.dataSourceService.dirty[dL.id])),
      concatMap(dList => from(dList)),
      flatMap(dItem => this.someService.getDataFromApi(dItem), (item, data) => {
        return { id: item.id, data };
      }),
      tap(({ id, data }) => {
        this.fetchedData[id] = data;
        this.dirty[id] = false;
        this.dataSourceService.resetDirty(id);
      })
    );

  dataFetch$.merge(this.dataSourceService.data$)
    .subscribe(() => {
      this.fetchedData = _.pickBy(this.fetchedData, (__, key) => _.has(this.dirty, key));
      this.reportData$.next(this.fetchedData);
    });
}

即使过滤器返回 false,也应该调用 subscribe 方法。 上述方法的问题是订阅会被调用两次。

如果 dList 为空,则不调用 dataFetch$,因此调用 subscribe 一次,如果不为空,则调用两次 subscribe。


设计是如果item从this.dataSourceService.data$中逐一移除,最后this.dataSourceService.data$.length变为0,observable链不会到达subscribe,此时也让this.fetchedData = empty

由于 dataSourceService.data$ 中的项目被删除,this.fetchedData 中的相应项目应该被删除,我不知道删除了哪个项目,这就是脏标志的原因,注意第一次点击操作。 在订阅中,dirtyList 用于更新 fetchedData。

【问题讨论】:

    标签: angular rxjs rxjs5


    【解决方案1】:

    问题 1

    您可能需要考虑创建一个类似于您已经拥有的 Observable,但没有 sample 运算符。一旦你有了这样一个 Observable,你就拥有了一个基本的构建块,可以让你到达你想到达的地方。

    基本的 Observable 应该看起来像

    const basicObs = combineLatest(this.layerService.layersData$, this.displayService.displayData$, this.dataSource.data$,
          (layer, display, data) => ({ layer, display, data }))
          .pipe(
            skipWhile(({ layer, display, data }) =>
              _.isEmpty(layer) || _.isEmpty(display) || _.isEmpty(data)),
            takeWhile(() => this.cacheService.isDirty()),
            map(result => {
              const layerFiltered = result.layer.filter(ly => result.display.findIndex(d => d.id === ly.id) !== -1);
              return { ...result, layer: layerFiltered };
            })
      )
    

    然后,您可以使用 concat 运算符将basicObs 的第一个发射与所有后续发射结合起来。代码看起来像

    const firstNotification = basicObs.pipe(
      take(1)
    );
    const followingNotifications = basicObs.pipe(
      skip(1), // to avoid emitting the first element
      sample(interval(2000))
    );
    
    firstNotification.pipe(
      concat(followingNotifications)
    )
    .subscribe(result => console.log(result))
    

    问题 2

    如果您希望在执行this.layerService.getLayerData() 的订阅之前执行this.displayService.displayData$.delay(500).take(1) 的订阅,那么您可能不想尝试这样的操作

    ngOnInit() {
       this.displayService.displayData$.pipe(
          delay(500),
          take(1),
          tap(data =>  this.displayMeta = data),  // this is the side effect that you have with the first subscription
          switchMap(() => this.layerService.getLayerData())
       )
         .subscribe(layers => {
           this.layers = layers;
         });
    }
    

    这里的关键思想是使用switchMap 在第一个 Observable 发射后立即切换到第二个。不过在切换之前,我们通过tap 运算符运行嵌入在this.displayService.displayData$.delay(500).take(1) 订阅中的副作用。

    【讨论】:

    • 如果第二个问题的订阅部分很大怎么办。
    • 我又添加了一个问题
    • “订阅量很大”是什么意思?
    • 为什么不直接删除filter
    • 订阅部分是巨大的手段,如果它不平凡,并且 switchMap 不会在一轮后停止监听,因为有 take(1)
    猜你喜欢
    • 1970-01-01
    • 2021-08-28
    • 2021-11-16
    • 1970-01-01
    • 2018-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多