【问题标题】:How to modify observable stream by another observable in RXJS? Angular如何通过 RXJS 中的另一个 observable 修改 observable 流?角
【发布时间】:2019-01-04 14:59:11
【问题描述】:

当标志 areShipsExpanded$ 将更改时,我想更改 ships$ 中的嵌套标志。 我怎样才能做到这一点。这是我目前所拥有的:

areShipsExpanded$: Observable<boolean>;
ships$: Observable<Ship>;

constructor() {
   this.ships$ = this.shipsDataSource.getData().pipe(shareReplay());
}

ngOnInit(): void {
    this.areShipsExpanded$.pipe(
        map(flag => {
            if (flag) {
                this.ships$.pipe(map(s => s.items.forEach(r => r.isExpanded = flag)));
            }
        })
    ).subscribe();
}

不幸的是,ships$ 似乎没有被修改,或者我应该以某种方式刷新这个 ships$ observable,因为我以这种方式在我的视图中使用它:&lt;ng-container *ngFor="let ship of (ships$ | async)?.items"&gt;


@编辑

我是这样做的,是正确的,还是不好的做法?

this.areShipsExpanded$.pipe(
    map(flag => {
        if (flag) {
            this.ships$ = this.ships$.pipe(map(s => {
                s.items.forEach(r => r.isExpanded = flag);
                return s;
            }));
        }
    })
).subscribe();

@EDIT2

我用combineLatest,现在应该可以了吗?

shipsData$: Observable<Ship>;
ships$: Observable<Ship>;

constructor() {
   this.shipsData$ = this.shipsDataSource.getData().pipe(shareReplay());
}

this.ships$ = combineLatest(this.areShipsExpanded$, this.shipsData$).pipe
    map(([shipsExpanded, ships]) => {
        ships.items.forEach(r => r.isExpanded = shipsExpanded);
        return ships;
    }),
    share()
);

但是combineLatest 的问题是,在开始时函数将运行 2 次,因为两个 observables 都会发出值。是否有任何替代运算符的行为类似于combineLatest - 所以当任何可观察对象发出一个值时,从每个发出最新值,但仅当所有可观察对象都准备好时。避免在开头重复。

【问题讨论】:

  • 扩展,你想更新你的船吗?你的模型似乎没有处理这个问题。你应该有一个 Observable 和一个船舶列表。在这里,你有两个 observables。这意味着您只能在您的舰船列表更改时更新您的舰船。
  • 在 RxJS 中搜索组合流,您可以组合 areShipsExpandedships 并监听两者的变化,然后映射结果以创建最终的 ship$。使用哪种组合运算符取决于您的要求,寻找combineLatestzip等。
  • @sabithpocker 请检查编辑
  • 您在编辑中所做的是覆盖this.ships$ 属性,但原始this.ships$ 的订阅者将继续订阅原始this.ships$ 而不是新的。跨度>
  • 也许 this.areShipsExpanded$.pipe(withLatestFrom(this.shipsData$),map(//...),share()) 云帮助你(只有在 this.areShipsExpanded 时才会触发流$ 正在发射,如果 this.shipsData$ 发射了一个值)

标签: angular rxjs rxjs6


【解决方案1】:

希望对您有帮助:

const fetchData$ = this.shipsDataSource.getData().pipe(shareReplay());

this.ships$ = merge(
    this.areShipsExpanded$.pipe(
        withLatestFrom(fetchData$),
        map(([shipsExpanded, ships]) => 
            // we could try to favor immutability
            ships.items.map(r => {...r, isExpanded:shipsExpanded})), 
    ), // ships after update
    fetchData$ // initial ships
).pipe(share());

【讨论】:

    猜你喜欢
    • 2019-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-21
    • 2017-11-19
    相关资源
    最近更新 更多