【问题标题】:Using rxjs how can I queue HTTP requests such that it waits for an existing request to complete and then sends only the most recent request?使用 rxjs 如何对 HTTP 请求进行排队,使其等待现有请求完成,然后仅发送最新请求?
【发布时间】:2020-01-03 13:17:51
【问题描述】:

我有一个 observable 可以跟踪表单的变化。当表单更新时,它应该通过向服务器发送 HTTP PUT 请求来自动保存。我想要这种行为,以便在进行更改时发送第一个 HTTP 请求。对于任何后续更改,在前一个请求完成之前不应发送任何请求。当上一个请求完成后,我想只发送最新的更新。

我曾考虑过使用排气映射,例如formChanges$.pipe(exhaustMap(data => sendHttpUpdate(data))); 因为这会给我在请求进行时忽略所有更新的行为。但是,所有数据在执行过程中都会丢失,因此我需要重新触发 formChanges$ 可观察对象以发送最新更新,否则它将被忽略。

我也考虑过 concatMap 例如formChanges$.pipe(concatMap(data => sendHttpUpdate(data))); 因为这会等待上一个请求完成,但会按顺序发送所有更新,而不仅仅是最新的。

我想使用纯 rxjs 解决方案,即使用 Observables、Subjects 和运算符,而不使用变量设置状态。我是否缺少可以实现此目的的明显运算符或运算符组合?

这是我迄今为止最好的尝试:

    const $send = new Subject();

    formChanges$.pipe(
        sample($send),
        switchMap(this.fakeHttpRequest),
      )
      .subscribe(handleResponse);



    formChanges$.pipe(first())
      .subscribe(() => $send.next());

其中 fakeHttpRequest 是一个函数,它接受值 formChanges$ 并返回 HTTP 请求的 Observable。

这并不令人满意,因为我必须在第一次更改表单后手动触发第一次发送,我觉得这并不能很好地传达代码的意图。

【问题讨论】:

  • 所以,只是重申问题 - 您需要缓冲连续更新直到当前 Web 请求完成,然后跳过除最后一个缓冲更新之外的所有更新?
  • 是的,就是这样

标签: angular rxjs reactive-programming


【解决方案1】:

我不敢相信我花了 9 个月的时间才弄清楚这一点,但万一其他人为此大发雷霆:

我意识到没有一个映射运算符(concatMap、exhaustMap)完全符合要求,因此您需要处理映射,然后自己订阅哪些值。解决方案是将每个表单值映射到可观察的 HTTP 请求(尚未订阅,因此没有请求)。然后将此 observable 作为参数传递给throttle。 Throttle 接受一个 observable,它订阅并忽略来自原始源的值,直到完成。 Throttle 还接受一个配置参数,允许您定义前导值和尾随值的行为 - 使其非常灵活。对于这种情况,我们需要两者,因为我们想知道第一个请求的结果和最后一个缓冲请求的结果。

从这里我们现在有了一个包含我们想要的所有源值的可观察对象,即请求数据。但是,我们想要响应数据!简单地再次订阅最终会导致重复的 HTTP 调用(可能有不同的结果)。不要害怕,因为这是shareReplay 来救援的地方。如果我们通过 shareReplay 将原始请求 observable 管道化,那么后续订阅 observable 现在只会得到原始响应,而不是进行新的调用。由于我们有一个 observable 的 observables,我们将要使用 mergeAll 运算符将其展平。我们现在可以订阅它以获取我们的 HTTP 请求的结果。

把这一切放在一起:

formChanges$.pipe(
  map(formValues => this.fakeHttpRequest(formValues).pipe(shareReplay(1)))
  throttle(x => x, {leading: true, trailing: true}),
  mergeAll(),
).subscribe((result) => console.log(result));

根据内幕消息,我假设您关心结果。如果这不是真的,那么这会更简单。我们可以简单地让限制发送请求,我们不必担心重复请求:

formChanges$.pipe(
  map(formValues => this.fakeHttpRequest(formValues)))
  throttle(x => x, {leading: true, trailing: true}),
).subscribe();

【讨论】:

    【解决方案2】:

    嗯,如果我理解的话,这样的事情可能会奏效:

    changeStarted: Subject<boolean>; // Keep track if save started;
    changeComplete: Subject<boolean>; // Keep track if save finished;
    
    constructor() {
      this.changeStarted =  Subject<boolean>(); 
      this.changeComplete =  Subject<boolean>(); 
      
      formChanges$.pipe(
        bufferToggle(this.changeStarted, () => this.firstCall ? interval(1) : this.changeComplete) // once change has started wait till change is completed.
      ).subscribe(x => {
        this.changeStarted.next(true) //signal to start buffering.
        // Make the http request
        this.fakeHttpRequest.subscribe(() => this.changeComplete.next(true));
    
    }

    【讨论】:

    • 这不起作用。 bufferToggle 的第二个参数应该是一个返回 observable 的函数。即使那样,我也看不出这是如何工作的,因为对于非并发请求(例如第一个请求),缓冲区必须在请求发送之前打开和关闭。话虽如此,使用缓冲区是有希望的,并且可能会导致解决方案,尽管我还没有想出一个可行的例子
    • 对不起,它应该是一个函数 () =&gt; this.changeComplete 应该可以完成这项工作;我已编辑以添加更正的代码。至于第二个查询,您有点 hack,但您可以稍微扩展上述函数并使用参数来表示第一次调用 () =&gt; this.firstCall ? interval(1) : this.changeComplete
    • 这适用于第一个,但不适用于后续的非并发更改。想象一下,我进行了更改,请求到达服务器并完成。现在我进行第二次更改。前一个缓冲区已经关闭,它不是第一次更改,因此不会再次关闭。我的第二个更改没有发送到服务器
    • 啊需要订阅tap(y =&gt; this.changeComplete.next(true)) 的 httpRequest 才能被称为抱歉。再次做出改变
    • 我注意到了这个遗漏并添加了它。它仍然不起作用。您可以尝试通过添加类似这样的内容来测试它 - 将 fakeHttpRequest 定义为:` fakeHttpRequest(x: number) { return of(x * 10).pipe(delay(2000)); } ` 为表单更改添加一些值:` formChanges$.next(1); formChanges$.next(2); formChanges$.next(3); of(4).pipe(delay(5000)).subscribe(x => formChanges$.next(x)); ` 同时记录订阅 fakeHttpRequest 的值。现在预期的输出应该是 10、20 和 40。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-09-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多