【问题标题】:Project Reactor + flatMap + Multiple onErrorComplete - Not working as expectedProject Reactor + flatMap + 多个 onErrorComplete - 未按预期工作
【发布时间】:2020-09-24 04:51:06
【问题描述】:

当多个onErrorContinue 添加到管道以处理从flatMap 抛出的特定类型的异常时,异常处理无法按预期工作。

下面的代码,我希望,元素 1 到 6 应该被删除,元素 7 到 10 应该被订阅者消费。

public class FlatMapOnErrorContinueExample {
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .flatMap(number -> {
                    if (number <= 3) {
                        return Mono.error(new NumberLesserThanThree("Number is lesser than 3"));
                    } else if (number > 3 && number <= 6) {
                        return Mono.error(new NumberLesserThanSixButGretherThan3("Number is grether than 6"));
                    } else {
                        return Mono.just(number);
                    }
                })
                .onErrorContinue(NumberLesserThanThree.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 3"))

                .onErrorContinue(NumberLesserThanSixButGretherThan3.class,
                        (throwable, object) -> System.err.println("Exception: Dropping the element because it is lesser than 6 but grether than 3"))

                .onErrorContinue((throwable, object) ->
                        System.err.println("Exception: " + throwable.getMessage()))

                .subscribe(number -> System.out.println("number is " + number),
                        error -> System.err.println("Exception in Subscription " + error.getMessage()));
    }

    public static class NumberLesserThanThree extends RuntimeException {
        public NumberLesserThanThree(final String msg) {
            super(msg);
        }
    }

    public static class NumberLesserThanSixButGretherThan3 extends RuntimeException {
        public NumberLesserThanSixButGretherThan3(final String msg) {
            super(msg);
        }
    }
}

这是我得到的输出:

Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception: Dropping the element because it is lesser than 3
Exception in Subscription Number is grether than 6

问题:为什么没有调用第二个onErrorContinue,而是将异常发送给订阅者?

补充说明: 如果我删除第一个和第二个onErrorContinue,那么所有异常都由第三个onErrorContinue 处理。我可以使用这种方法来接收所有异常并检查异常类型并继续处理。但是,我想让它更清晰的异常处理,而不是添加 if..else 块。

这个问题与Why does Thread.sleep() trigger the subscription to Flux.interval()?有何不同

1)关于异常处理和异常处理顺序的这个问题;另一个问题是关于并行处理元素并使主线程等待所有元素处理完成 3)这个问题对线程没有任何关注,即使在. subscribe之后添加Thread.sleep(10000),行为也没有变化。

【问题讨论】:

    标签: project-reactor spring-reactor


    【解决方案1】:

    这又归结为onErrorContinue 的异常行为。它打破了规则,因为它不会“捕获”错误,然后因此改变下游的行为,它实际上允许支持操作员“向前看”并做出相应的行为,从而改变结果上游 em>。

    这很奇怪,并且会导致一些不是立即显而易见的行为,例如这里的情况。据我所知,所有支持的运算符只向前看 next onErrorContinue 运算符,而不是递归搜索所有此类运算符。相反,他们将评估下一个onErrorContinue 的谓词(在这种情况下,它是否属于某种类型),然后做出相应的行为——如果谓词返回 true,则调用处理程序,否则将错误抛出下游。 (它不会移动到 next onErrorContinue 运算符,然后是下一个,直到匹配谓词。)

    显然这是一个人为的例子 - 但由于这些特质,我几乎总是建议避免使用onErrorContinue。在涉及flatMap() 的情况下,可能会发生两种“正常”方式:

      1234563来处理这些错误。您可以链接onErrorResume(),因为它适用于下游,而不是上游运营商。这是迄今为止最常见的情况。
    1. If flatMap() 是 if / else 的命令式集合,它返回不同的发布者,例如它在这里,您希望/必须保持命令式风格,抛出异常而不是使用 Mono.error(),并捕获酌情返回Mono.empty()以防出错:

        .flatMap(number -> {
            try {
                if (number <= 3) {
                    throw new NumberLessThanThree();
                } else if (number <= 6) {
                    throw new NumberLessThanSixButGreaterThan3();
                } else {
                    return Mono.just(number);
                }
            }
            catch(NumberLessThanThree ex) {
                //Handle it
                return Mono.empty();
            }
            catch(NumberLessThanSixButGreaterThan3 ex) {
                //As above
            }
        })
    

    一般来说,使用这两种方法中的一种可以更容易地推断出正在发生的事情。

    (为了在阅读 cmets 后保持完整性 - 这与反应链在主线程退出之前无法完成无关。)

    【讨论】:

    • 感谢您的详细解释和澄清以及我可以克服限制的几种方法。是的!这是我为理解行为而写的一个人为的实现;但这与我的不太接近,正如您提到的“内部反应链”,它由最多 4 个运算符组成。我想赞成方法#1而不是#2。因此,让我使用 onErrorResume 重新排序我的操作符和管道,看看是否可以实现和改进错误处理。
    • @NaveenKumar 是的,如果可以使用#1,它绝对是更好的方法。恕我直言,混合命令式和反应式有点代码味道,这会使代码更难推理。
    • @MichaelBerry 假设如果没有 flatMap 运算符而是一个简单的地图运算符,并且我们正在从热门发布者那里消费。因此,如果我需要避免使用 onErrorContinue,我正在考虑使用 .retry() 运算符,但这可能会导致热发布者遗漏某些元素,因为 retry() 将花费少量时间重新订阅热发布者.那么,这种情况下的解决方案是什么?
    猜你喜欢
    • 2021-01-12
    • 1970-01-01
    • 1970-01-01
    • 2019-11-16
    • 2016-07-23
    • 2013-07-12
    • 1970-01-01
    • 2020-01-01
    • 1970-01-01
    相关资源
    最近更新 更多