【问题标题】:Why are the timeout(Duration timeout) of both Mono/Flux fired based on accumulated time period为什么基于累积时间段触发 Mono/Flux 的超时(持续时间超时)
【发布时间】:2021-07-03 09:12:36
【问题描述】:

我的情况如下:

我有两个 web 请求,分别名为 request1 和 request2,request2 的输入来自 request1 的输出。现在我想为这两个请求设置超时。理想情况下,request1 的时间成本为 2s,request2 的时间成本为 3s。所以我想将request1的超时时间设置为3s,将request2的超时时间设置为4s。正如 Mono#timeout 的文档所说,我认为这是可以做到的。但不幸的是,第二次超时是通过累加计算出来的。所以我对this mono的含义感到困惑。

Mono#timeout(Duration timeout)(https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#timeout-java.time.Duration-)的文档

public final Mono<T> timeout(Duration timeout)
Propagate a TimeoutException in case no item arrives within the given Duration.
Parameters:
timeout - the timeout before the onNext signal from this Mono
Returns: a Mono that can time out

我的案例示例代码:

Mono<String> startMono = Mono.just("start");
    String result = startMono
        .map(x -> {
          log.info("received message: {}", x);
          try {
            Thread.sleep(2000);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
          return "#1 enriched: " + x;
        })
        .timeout(Duration.ofSeconds(3))
        .onErrorResume(throwable -> {
          log.warn("Caught exception, apply fallback behavior #1", throwable);
          return Mono.just("item from backup #1");
        })
        .map(y -> {
          log.info("received message: {}", y);
          try {
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
          return "#2 enriched: " + y;
        })
        .timeout(Duration.ofSeconds(4))
        // there is no timeoutException thrown if I set the second timeout to 6s (6s > 2s + 3s)
//        .timeout(Duration.ofSeconds(6))
        .onErrorResume(throwable -> {
          log.warn("Caught exception, apply fallback behavior #2", throwable);
          return Mono.just("item from backup #2");
        })
        .block();
    log.info("result: {}", result);

以上代码抛出异常:

16:46:51.080 [main] INFO  MonoDemo - received message: start
16:46:53.095 [elastic-2] INFO  MonoDemo - received message: #1 enriched: start
16:46:55.079 [parallel-1] WARN  MonoDemo - Caught exception, apply fallback behavior #2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 4000ms in 'flatMap' (and no fallback has been configured)
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:288) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.doTimeout(FluxTimeout.java:273) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.FluxTimeout$TimeoutTimeoutSubscriber.onNext(FluxTimeout.java:395) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:117) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.3.10.RELEASE.jar:3.3.10.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]
16:46:55.095 [main] INFO  MonoDemo - result: item from backup #2

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    timeout 运算符测量从订阅到第一个信号到达的时间。更多关于这个here

    如果您只想在第二个操作上应用超时,那么您需要将timeout 运算符放在只有第二个请求在范围内的位置。请参阅以下内容:

    public void execute() {
        firstRequest()
            .onErrorResume(throwable -> secondRequest())
            .onErrorReturn("some static fallback value if second failed as well")
            .block();
    }
    
    private Mono<String> firstRequest() {
        return Mono.delay(Duration.ofSeconds(2))
            .thenReturn("first")
            .timeout(Duration.ofSeconds(3));
            // additional mapping can be done here related to first request
    }
    
    private Mono<String> secondRequest() {
        return Mono.delay(Duration.ofSeconds(3))
            .thenReturn("second")
            .timeout(Duration.ofSeconds(4));
            // additional mapping can be done here related to second request
    }
    

    通过将 timeout 运算符移动到私有方法中,我们确保只测量那些特定 Monos 的持续时间,而不是整个链。

    【讨论】:

    • 嗨,马丁。感谢您的回答。我可以从你的回答中得到你的观点。但我最喜欢的答案是示例代码的整个流程。例如:onNext 信号是从哪里来的,它是如何在整个流程中转换的,它是什么时候被处理并从序列中删除的?
    • @AlvinYueChao 我做了一些更改,使其更像您的代码示例。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多