【问题标题】:How to delay repeated WebClient get request如何延迟重复的 WebClient 获取请求
【发布时间】:2019-02-08 11:58:15
【问题描述】:

我正在使用 Spring 5 WebClient 从 REST api 重复获取正在运行的进程的某些状态。

here 的帮助下,我现在找到了这个解决方案:

webClient.get().uri(...).retrieve.bodyToMono(State.class)
          .repeat()
          .skipUntil(state -> stateFinished())
          .limitRequest(1)
          .subscribe(state -> {...});

虽然这有效,但获取请求的触发率非常高。将请求速率限制为每秒 1 个请求的正确方法是什么?

我尝试使用delayElements(Duration.ofSeconds(1)),但这只会延迟结果,而不是请求本身。

【问题讨论】:

    标签: spring-webflux project-reactor


    【解决方案1】:

    您可以将repeatWhen 运算符与您的伴侣Publisher 的自定义实现一起使用

    Mono.just("test")
            .repeatWhen(longFlux -> Flux.interval(Duration.ofSeconds(1)))
            .take(5)
            .log()
            .blockLast();
    

    或使用reactor-addons 中的Repeate 函数

    Mono.just("test")
            .repeatWhen(Repeat.times(Long.MAX_VALUE)
                    .fixedBackoff(Duration.ofSeconds(1)))
            .take(5)
            .log()
            .blockLast();
    

    【讨论】:

      【解决方案2】:

      在您的情况下,还有另一个小解决方法,您可以使用 Flux 作为限制器来压缩每个呼叫。

      .zipWith(Flux.interval(Duration.of(1, ChronoUnit.SECONDS)))
      

      虽然我会认为 delayElements() 可以工作,但也许你没有把它放在你的 Webclient 堆栈的正确阶段。

      【讨论】:

        【解决方案3】:

        您正在使用delayElements 告诉我您将它放在重复之后。您要延迟的是对 WebClient 的订阅。

        webClient
              .get()
              .uri(...)
              .retrieve
              .bodyToMono(State.class)
              .delaySubscription(Duration.ofSeconds(1)) //Just add this before the repeat
              .repeat()
              .skipUntil(state -> stateFinished())
              .limitRequest(1)
              .subscribe(state -> {...});
        

        这样做可以确保在第 n 个请求的响应和第 n+1 个请求的触发之间有一秒钟。如果您需要固定的调用频率而不管每个请求响应所花费的时间,请按照 Roman 的建议使用 Flux.interval 包装您的代码。

        【讨论】:

        • 这也会延迟第一个请求。
        【解决方案4】:

        您的问题的替代解决方案

        Flux.interval(Duration.ZERO, Duration.ofSeconds(1))
                .onBackpressureDrop()
                .concatMap(i -> webClientCall(...), 1)
                //or flatMap() if you want send request each second
                .filter(state -> stateFinished(state))
                .next()
                .timeout(Duration.ofSeconds(...))
                //
                .subscribe(state -> {...});
        

        但请记住,如果您自己订阅(而不是 Spring),那么 reactor 订阅者上下文不会传播到您的请求(没有安全上下文、侦探等...)

        【讨论】:

          猜你喜欢
          • 2015-01-07
          • 2019-11-28
          • 2020-05-11
          • 2013-01-08
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2017-08-12
          • 2021-01-08
          相关资源
          最近更新 更多