【问题标题】:Race condition in RxJS?RxJS 中的竞争条件?
【发布时间】:2021-03-12 20:21:44
【问题描述】:

我目前正在研究 RxJS 组件,该组件的作用是定期从数据库中提取一些数据,使其作为 BehaviorSubject Observable 可用并可供客户端使用。

// "Producer"
const source2 = Observable.create((observer: Observer<Map<number,any>>) => {
  this.fooService.loadFoos().then((val) => {
    observer.next(val)
    observer.complete()
  })
})

const subj = new BehaviorSubject(new Map())

interval(5000).pipe(
  switchMap(() => source2),
).subscribe(subj)


// "Client"
return subj.pipe(
  first(),        // get only the latest data, a Map with number keys for foos
  map((f: Map<number,any>) => {
    if (!f.has(fooId)) {
      throw new Error('foo not found')
    }
    return f
  }),
).toPromise()

它工作得很好,除非在极少数情况下,间隔开始以及与 switchMap f/value 一起看起来像一个空白地图。这是在使用 apache 基准测试的压力测试下重现的。

为什么会这样?上面的代码有什么问题吗?是因为switchMap的取消效果吗?

这就是失败的频率:

Concurrency Level:      100
Time taken for tests:   6.620 seconds
Complete requests:      5000
Failed requests:        1230

【问题讨论】:

    标签: rxjs


    【解决方案1】:

    这不是一个确定的答案,但评论的时间太长了:-)


    您的 BehaviorSubject 以默认值 new Map() 开头,因此在您的时间间隔第一次触发 (5000 毫秒) 和第一个 promise 调用返回之前发生的任何订阅都将收到一个空映射。

    如果您需要重播功能,但不需要默认值,您可以改用ReplaySubject

    const subj = new ReplaySubject<Map<number,any>>(1);
    

    是不是因为switchMap的取消作用

    没有。 switchMap 不会只返回一个空数组,当它“切换”时,它只会丢弃前一个可观察源的发射,并且只传播新源的发射。

    getFoos() 是否有可能返回空地图?

    注意事项:

    你的“生产者”函数基本上是from() (docs | source code),所以你可以使用:

    const source2 = from(this.fooService.loadFoos());
    

    如果你没有在任何地方打电话给subject.next,你可以完全摆脱这个话题,只需这样做:

    const subj = interval(5000).pipe(
      startWith(new Map()),
      switchMapTo(source2),
      shareReplay(1)
    );
    

    【讨论】:

    • 感谢有用的解释和提示。 “getFoos() 是否有可能返回一个空地图?”是的-不幸的是,经过一些调试,这就是导致问题的原因:\
    猜你喜欢
    • 2021-05-20
    • 2016-03-07
    • 2011-07-17
    • 2013-02-27
    • 2021-04-16
    • 1970-01-01
    • 2023-04-03
    • 1970-01-01
    • 2022-01-23
    相关资源
    最近更新 更多