【问题标题】:How to implement polling with backpressure in RXJS如何在 RXJS 中实现带背压的轮询
【发布时间】:2018-04-05 20:06:58
【问题描述】:

我想轮询一个响应速度较慢的 HTTP API,因此我不想同时对该 API 进行多次调用。

我想做的一个例子可能是:

const interval = Rx.Observable.interval(250).take(5); // Poll every 250ms

function simulateMaybeSlowHttpCall() {
  return interval.delay(500).take(1); // The service takes 500ms to answer
}

 interval
   .mergeMap(val =>simulateMaybeSlowHttpCall().map(x => val), 1) // max concurrent is 1
   .subscribe(val => console.log(val));

在这里,此代码将显示 1 2 3 4 5

但我不想做无用的电话。 以上代码运行时间为 250*5 = 1250 ms,1 次调用耗时 500ms,所以我想显示:

1 3 5

所以我的问题是:将并发设置为1(或任何其他值)时,如何丢弃所有未立即完成的调用?

JsFiddle:https://jsfiddle.net/zra3zxhs/63/

【问题讨论】:

  • 听起来你应该使用exhaustMap而不是mergeMap
  • 它似乎做我想做的事。您可以将其发布为答案,这样我会将其标记为已接受吗?

标签: javascript rxjs observable reactive-programming


【解决方案1】:

使用并发为 1 的 mergeMap 等效于 concatMap。事实上,concatMap is implemented 就是这样。这就是您示例中的每个间隔都会影响 HTTP 请求的原因:它们被排队。

如果您希望避免在 HTTP 请求待处理时发起或排队,您可以使用exhaustMap

interval
  .exhaustMap(val => simulateMaybeSlowHttpCall().map(x => val))
  .subscribe(val => console.log(val));

当使用exhaustMap 时,它接收到的任何next 通知都会被忽略,直到内部可观察对象(HTTP 请求)完成。

【讨论】:

    猜你喜欢
    • 2020-04-16
    • 2018-03-18
    • 1970-01-01
    • 1970-01-01
    • 2017-10-23
    • 2019-11-22
    • 1970-01-01
    • 2018-10-08
    • 1970-01-01
    相关资源
    最近更新 更多