【问题标题】:How to remove transient events from RxJS combineLatest with NGRX store source observable如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable
【发布时间】:2019-12-18 17:49:50
【问题描述】:

我正在使用 TypeScript、Angular、NGRX 应用程序。我一直在编写状态可观察对象而不使用选择器——主要原因是我发现它们不如直接使用 RxJS 运算符强大。例如,不能单独使用选择器来限制事件的发射 - 而是必须使用过滤运算符。

在大多数情况下,用 observables 替换选择器没有任何问题 - observables 可以以与选择器相同的方式组成 - 除了一个例外:我不知道如何组成 observables 可能由相同的触发行动。通常,我使用 combineLatest 作为我的 goto observable composer;然而,如果两个 observables 会更新同一个 action,就会有一个短暂的更新,其中一个 observables 具有来自新状态的值,而另一个具有来自先前状态的值。

最初,我考虑使用 zip observable 创建器来代替;然而,虽然这解决了两个 observable 一起更新时的问题,但它并不能解决一个 observable 在没有另一个 observable 的情况下更新的问题——这在 NGRX 架构中是完全可能的。

然后我考虑了 auditTime(0) 运算符,它确实解决了删除瞬态更新的问题,但有新问题 1) 它会导致 observables 在稍后的事件循环中发出,这会破坏应用程序内部的一些假设(可解决,但很烦人) 2)它使各种可观察对象尽快发射,而我希望所有可观察对象在同一个存储脉冲上一起发射。从图形上看,这意味着应用程序的不同部分的渲染是交错的,而不是在同一帧上一起绘制(请注意,我们的应用程序数据量很大,并且经常需要在存储脉冲上丢帧)

最后,我编写了一个自定义操作符来组合来自同一来源的 observables

export type ObservableTuple<TupleT extends any[]> = {
  [K in keyof TupleT]: Observable<TupleT[K]>;
};

export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> {
  return (source$: Observable<SourceT>) => source$.pipe(
    withLatestFrom(combineLatest<TupleT>(inputs)),
    map(([, values]) => values),
  );
}

这里总结一下TypeScript中的问题(使用NGRX、RxJS、Angular的sn-ps)

interface IState {
    foo: string;
    bar: string;
}

@Injectable()
class SomeService {
    constructor(store$: Store<IState>) {
    }

    readonly foo$ = this.store$.pipe(select(state => state.foo));
    readonly bar$ = this.store$.pipe(select(state => state.bar));

    readonly composed$ = this.store$.pipe(
        selectFrom(
            this.foo$,
            this.bar$,
        ),
        map(([foo, bar]) => `${foo} - ${bar}`),
    );
}

const UPDATE_FOO = {
    type: 'update foo',
    foo: 'some updated value for foo'
};
const UPDATE_BAR = {
    type: 'update bar',
    bar: 'some updated value for bar',
};
const UPDATE_BOTH = {
    type: 'update both',
    both: 'some updated value for both foo and bar',
};

即使 selectFrom 调用相互嵌套,也可以正常工作,例如

readonly composed2$ = this.store$.pipe(
   selectFrom(
      this.composed$,
      this.foo$
   )
)

只要在composed2$之前定义commodated$,一切都会顺利;但是,我没有考虑的一个情况是在组合 $ 和组合 2 $ 之间使用 switchMap 之类的运算符时。在这种情况下,由于 compsed2$ 被 switchMap 销毁并重新创建,commodated2$ 可能在composed$ 之前触发,这会导致一切不同步

【问题讨论】:

    标签: javascript rxjs ngrx


    【解决方案1】:

    对于您尝试组合 2 个可观察对象并且仅在它们都完成发射后才发射的特定问题,您可以尝试利用:

    • queue Scheduler - 让您推迟递归调用,直到当前调用完成
    • debounce - 延迟更新直到信号到达
    • observeOn - 只监听队列调度器上的存储更新

    然后您可以执行以下操作:

    readonly queuedStore$ = this.store$.pipe(
        observeOn(queue), // use queue scheduler to listen to store updates
        share() // use the same store event to update all of our selectors
      );
    
    // use queuedStore$ here
    readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
    readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));
    
    // when composing, debounce the combineLatest() with an observable
    // that completes immediately, but completes on the queue scheduler
    readonly composed$ = combineLatest(foo$, bar$).pipe(
      debounce(() => empty().pipe(observeOn(queue))));
    

    会发生什么?

    Foo 更新

    1. queuedStore$ 在queue 上安排通知
    2. 通知立即开始,因为当前没有运行任何内容
    3. foo$ 通知
    4. combineLatest 通知
    5. debounce 订阅 durationSelector
    6. durationSelector 在queue 上安排通知
    7. 未发送通知,因为队列中的操作当前正在运行
    8. 调用堆栈展开到第 1 步
    9. 队列调度程序运行 durationSelector 通知
    10. 去抖动触发并向 UI 发送更新

    栏更新

    与 Foo 更新相同

    BarFoo 更新

    1. queuedStore$ 在queue 上安排通知
    2. 通知立即开始,因为当前没有运行任何内容
    3. foo$ 通知
    4. combineLatest 通知
    5. debounce 订阅 durationSelector
    6. durationSelector 在queue 上安排通知
    7. 未发送通知,因为队列中的操作当前正在运行
    8. 调用堆栈展开到第 3 步
    9. bar$ 通知
    10. combineLatest 通知
    11. debounce 从 foo 通知中丢弃以前的值
    12. debounce 重新订阅durationSelector
    13. durationSelector 在queue 上安排通知
    14. 未发送通知,因为队列中的操作当前正在运行
    15. 调用堆栈展开到第 1 步
    16. 队列调度程序运行 durationSelector 通知
    17. 去抖动触发并向 UI 发送更新

    理论上,这会让你得到你想要的行为: - 单个更新立即应用(在下一个刻度之前) - 组合更新立即应用(在下一个刻度之前) - 组合更新忽略中间结果 - 如果您的组合 observable 使用 switch,应该仍然有效。

    注意事项

    如果您在 queue 调度程序上处理其中一个通知时调度另一个事件,则该第二个事件的通知将推迟到当前处理程序完成之后。

    【讨论】:

    • 这看起来很有希望。我设置了一个虚拟示例,它似乎具有我想要的基本属性。明天我会尝试将它集成到我的应用程序中,看看它是否没有选中任何框。非常感谢您的详细回答!顺便说一句,我认为要注意的事情很好,因为 - 通常我想在下一个状态之前处理当前状态 - 商店(很确定..)已经在 queueScheduler 上观察
    • 这种方法有一些陷阱,但总的来说它满足了我提出的所有要求。非常感谢,这真的很有帮助!
    猜你喜欢
    • 2021-02-21
    • 2018-04-08
    • 1970-01-01
    • 2018-05-02
    • 2014-09-23
    • 1970-01-01
    • 1970-01-01
    • 2023-04-07
    • 2012-03-02
    相关资源
    最近更新 更多