【问题标题】:Reactor how to repeat some step n times if condition is met如果满足条件,反应堆如何重复某些步骤 n 次
【发布时间】:2025-12-02 03:10:01
【问题描述】:

请帮我处理反应堆 我需要最多检查一个条件 n 次并返回最终结果

我发现reactor有reactor-extra模块

https://projectreactor.io/docs/extra/snapshot/api/reactor/retry/Repeat.html

它有结构 Repeat.create(java.util.function.Predicate<? super RepeatContext<T>> predicate, long n) 仅当谓词返回 true 时重复 n 次的重复函数。

看起来是正确的解决方案,但我不明白应该在哪里
那个动作,我想重复吗? 我有很多动作的 Flux,但我只想重复一个

请做一个代码示例

谢谢

private int culculateNextResult(some params) {
          // some implementation  
 }



private Boolean compareResults(int prevRes, int nextRes) {
          // some implementation
 }

 public Flux<Boolean> run(some params, Flux<Integer> prevResults){

      return prevResults.map(elem -> compareResults(elem, culculateNextResult(some params)));

 // THIS LOGIC SHOULD BE REPEATED N times if compareResults(elem,       
 // culculateNextResult(some params))) == false, if true, we don't need 
// to repeat 
     }

我想重复 compareResults(elem, culculateNextResult(some params))) 直到它是真的。但最大 n 倍并返回 Flux 作为结果

【问题讨论】:

    标签: project-reactor


    【解决方案1】:

    Flux.repeatMono.repeat 将重新订阅源,因此源的每个先前步骤都将通过新订阅重复。

    由于calculateNextResultcompareResults 在您的示例中都是同步操作,您可以使用简单的for 循环重复...

        public Flux<Boolean> run(some params, Flux<Integer> prevResults){
            return prevResults.map(elem -> {
                for (int i = 0; i < 5; i++) {
                    if (compareResults(elem, calculateNextResult(some params))) {
                        return true;
                    }
                }
                return false;
            });
        }
    

    如果calculateNextResultcompareResults 是返回Mono 的反应式方法,那么您可以使用flatMap 而不是map,并使用Mono.repeat* 方法之一。

    例如,像这样的:

        private Mono<Integer> calculateNextResult(some params) {
            // some implementation
        }
    
        private Mono<Boolean> compareResults(int prevRes, int nextRes) {
            // some implementation
        }
        public Flux<Boolean> run(some params, Flux<Integer> prevResults){
    
            return prevResults.flatMap(prevResult -> 
    
                calculateNextResult(some params)
                        .flatMap(nextResult -> compareResults(prevResult, nextResult))
                        .filter(comparisonResult -> comparisonResult)
                        .repeatWhenEmpty(Repeat.times(5))
                        .defaultIfEmpty(false));
        }
    
    

    在此示例中,repeatWhenEmpty 将导致对 flatMap 中创建的 Mono 的新订阅,这将导致 calculateNextResult 重新计算(假设从 calculateNextResult 返回的 Mono 设置为计算每个订阅的值)。

    【讨论】: