【问题标题】:Throw an exception if a Reactor Flux doesn't complete in a set time如果 Reactor Flux 未在设定的时间内完成,则抛出异常
【发布时间】:2021-06-04 18:34:29
【问题描述】:

我有一个可能长时间运行的 Flux,我想在经过一定时间后停止它。我找到了几种执行此操作的方法,但是我正在努力解决的是如何判断 Flux 超时而不是自然完成。

示例(非常简单)代码:

Flux.range(0, 10000)
        .take(Duration.ofMillis(1))
        .doOnNext(System.out::println)
        .collectList()
        .block();

我想要的是这样的:

Flux.range(0, 10000)
        .take(Duration.ofMillis(1))
        .doOnNext(System.out::println)
        .doOnError(t -> {
            if (t instanceof TimeoutException) {
                System.out.println("I timed out");
            }
        })
        .collectList()
        .block();

但是take 似乎没有通知错误;我看到的只是terminatecomplete 信号,如果我不在 Flux 中包含 take 并让它自然完成,我会得到。

我已经简要查看了确实会引发异常的 timeout 运算符,但是 timeout 看起来只有在 Flux 在特定时间内没有发出单个元素时才会引发异常,而不是如果整个 Flux 没有在特定时间内完成。

有没有人有任何关于他们如何解决这个问题的提示或示例?

提前致谢!

【问题讨论】:

    标签: java project-reactor


    【解决方案1】:

    更新

    您可以使用takeUntilOther 运算符在给定时间后发出错误信号:

    Duration timeout = Duration.ofMillis(500);
    Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(10))
        .doOnNext(System.out::println)
        .takeUntilOther(Mono.delay(timeout).then(Mono.error(new TimeoutException())))
        // .takeUntilOther(Mono.never().timeout(Duration.ofMillis(500))) // based on Michael's answer this is also an option
        .doOnError(t -> {
            if (t instanceof TimeoutException) {
                System.out.println("I timed out");
            }
        })
        .blockLast();
    

    如果使用 .collectList().then() 运算符将 Flux 转换为 Mono,那么您可以在此之后简单地应用 .timeout(...) 运算符,它将按照您的要求运行:

    Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(10))
        .doOnNext(System.out::println)
        .collectList()
        .timeout(Duration.ofMillis(500))
        .doOnError(t -> {
            if (t instanceof TimeoutException) {
                System.out.println("I timed out");
            }
        })
        .block();
    

    【讨论】:

    • 感谢您的解决方案!不幸的是,我无法将我的Flux 转换为Mono,所以看起来迈克尔的回答更符合我正在寻找的内容:)
    • @helencrump 有道理。我看到 Michael 的答案对你不适用,所以我用似乎满足你要求的替代方法更新了我的答案。
    • 啊,太好了,takeUntilOther 完成了我想要的工作 - 非常感谢 Martin!
    【解决方案2】:

    除了使用take(),您可以使用.mergeWith(Flux.never().timeout(Duration.ofMillis(500))) 将您的通量与另一个在特定时间范围后总是抛出超时异常的通量合并。

    举个例子,你会做这样的事情:

    Flux.range(0, 10000)
            .delayElements(Duration.ofMillis(10))
            .mergeWith(Flux.<Integer>never().timeout(Duration.ofMillis(500)))
            .doOnNext(System.out::println)
            .doOnError(t -> {
                if (t instanceof TimeoutException) {
                    System.out.println("I timed out");
                }
            })
            .collectList()
            .block();
    

    ...这会给你类似的东西:

    (...snip)
    24
    25
    26
    27
    28
    29
    30
    31
    I timed out
    

    【讨论】:

    • 谢谢!这是一个很好的解决方案。但是,如果我正确阅读了 Flux.never() 的文档,它看起来永远不会发出完成信号,并且从我自己的代码中测试来看,我的 Flux 似乎永远不会完成,直到 Flux.never() 超时,不幸的是不是我要找的,除非我误解了什么。
    • 当我的 Flux 完成时,感觉就像我想发出 Flux.never() (或同等)完成的信号,但我不确定这是否可行。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-03-12
    • 1970-01-01
    • 2021-01-20
    • 1970-01-01
    • 2019-08-25
    • 1970-01-01
    • 2017-01-12
    相关资源
    最近更新 更多