【问题标题】:Queue http calls in rxjs在 rxjs 中排队 http 调用
【发布时间】:2018-10-26 09:29:35
【问题描述】:

我正在开发一个会话服务来检查身份验证令牌是否已过期。如果是,它会调用刷新令牌。在此请求期间,所有传入请求都应排队,并在请求完成后发出。之后,所有传入的请求都可以在没有队列的情况下通过,直到令牌再次过期。我为此画了一个大理石图:

1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--

我将 1. 命名为 incoming$ Observable,2. 命名为 valve$ - 如果为 true,则请求可以通过,如果为 false,则应排队。当它变为真时,队列被触发。

到目前为止我做了什么?我认为这应该通过添加一个名为receiver$ 的中间 Observable 来完成,它会根据valve$ 改变其值。当valve$为真时,它只返回一个简单的主题,如果它是假的,它返回一个能够记录值的主题。

receiver$ = valve.pipe(
  map((value) => {
    if (value) {
      return new Subject();
    } else {
      return (new Subject()).pipe(
        shareReplay(),
      );
    }
  })
);

然后在incoming$ 中获得的每个新值都应该添加到recevier$ 中的当前可观察对象中:

incoming$.pipe(
  combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
  recevier.next(incomingValue);
});

这是我无法理解的部分。每当阀门变为真时,我需要来自receiver$ 的最后两个值。倒数第二个将持有队列,最后一个将持有活动主题。通过合并它们,我可以实现我的目标。我不知道如何实现这一点以及如何管理订阅。此外,对于这样一个看似简单的用例,这看起来过于复杂。

实现此行为的最佳方式是什么?

【问题讨论】:

  • 大约一年前我使用 egghead.io 用 RxJs 做一些高级的东西。 (大部分我现在都忘记了——因为我在做其他事情!)也许看看那个或尝试使用一些谷歌功夫来追踪作者的博客。

标签: javascript rxjs angular-httpclient


【解决方案1】:

您可以通过使用concatMap 来完成此操作,该concatMap 基于值形式valve$ 合并两个不同的流。请注意,这要求valve$incoming$ 都与share() 共享。

valve$
  .pipe(
    concatMap(v => v
      ? incoming$.pipe(takeUntil(valve$))
      : incoming$
        .pipe(
          takeUntil(valve$),
          bufferCount(Number.POSITIVE_INFINITY),
          mergeAll(),
        )
    ),
  )
  .subscribe(console.log)

现场演示:https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts

【讨论】:

  • 我也想到了concatMap,但问题是如果incoming$ 为真,它会将所有请求排队,这样不等待其他人完成会更有效
  • incoming$ 永远不是 trueconcatMap 不会排队任何请求。
【解决方案2】:

您可以按照这些思路考虑解决方案。

首先,您创建一个主题,通过该主题发出您想要发出的所有请求

const requests$ = new Subject<Observable<any>>()

然后你创建一个主题,通过它你可以传达阀门的状态,即你是可以立即执行请求还是必须缓冲它

const valve$ = new Subject<boolean>();

现在您可以创建一个仅在 valve 打开时传递请求的流,即如果 valve$ 发出的最后一个值是 true

const openStream$ = valve$.pipe(
  switchMap(valve => {
    if (valve) {
      return requests$;
    } else {
      return empty();
    }
  })
);

您还可以创建一个流,在 valve 关闭时缓冲所有请求

const bufferedStream$ = requests$.pipe(
  bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
  mergeMap(bufferedCalls => bufferedCalls)
)

现在您所要做的就是将merge openStream$bufferedStream$subscribe 发送到结果流,就像这样

merge(openStream$, bufferedStream$).pipe(
  mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})

我已经用以下数据测试了这个解决方案,用字符串的 Observables 模拟真实的 http 调用

const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);


const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-03-06
    • 2018-11-24
    • 1970-01-01
    • 1970-01-01
    • 2018-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多