【问题标题】:Mono switchIfEmpty() is always called单声道 switchIfEmpty() 总是被调用
【发布时间】:2019-06-19 19:10:00
【问题描述】:

我有两种方法。
主要方法:

@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
    return socialService.verifyAccount(loginUser)
            .flatMap(socialAccountIsValid -> {
                if (socialAccountIsValid) {
                    return this.userService.getUserByEmail(loginUser.getEmail())
                            .switchIfEmpty(insertUser(loginUser))
                            .flatMap(foundUser -> updateUser(loginUser, foundUser))
                            .map(savedUser -> {
                                String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                return new ResponseEntity<>(HttpStatus.OK);
                            });
                } else {
                    return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                }
            });

}

而这个被调用的方法(服务调用外部api):

public Mono<User> getUserByEmail(String email) {
    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(USER_API_BASE_URI)
            .queryParam("email", email);
    return this.webClient.get()
            .uri(builder.toUriString())
            .exchange()
            .flatMap(resp -> {
                if (Integer.valueOf(404).equals(resp.statusCode().value())) {
                    return Mono.empty();
                } else {
                    return resp.bodyToMono(User.class);
                }
            });
} 

在上面的例子中,switchIfEmpty() 总是从 main 方法中调用,即使返回了带有 Mono.empty() 的结果。

我找不到这个简单问题的解决方案。
以下也不起作用:

Mono.just(null) 

因为该方法会抛出一个NullPointerException

我也不能使用 flatMap 方法来检查 foundUser 是否为空。
遗憾的是,如果我返回 Mono.empty(),则根本不会调用 flatMap,因此我也无法在此处添加条件。

@SimY4

   @PostMapping("/login")
    public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
        userExists = false;
        return socialService.verifyAccount(loginUser)
                .flatMap(socialAccountIsValid -> {
                    if (socialAccountIsValid) {
                        return this.userService.getUserByEmail(loginUser.getEmail())
                                .flatMap(foundUser -> {
                                    return updateUser(loginUser, foundUser);
                                })
                                .switchIfEmpty(Mono.defer(() -> insertUser(loginUser)))
                                .map(savedUser -> {
                                    String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                    return new ResponseEntity<>(HttpStatus.OK);
                                });
                    } else {
                        return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                    }
                });

    }

【问题讨论】:

  • 我不确定这句话是否正确。 switchIfEmpty() is always called from the main method, even when a result with Mono.empty() is returned.。它的意思是被称为不是吗?
  • 你能详细说明你的问题吗? ''switchIfEmpty() 总是从 main 方法调用,即使返回 Mono.empty() 的结果''。这是预期的行为。
  • @Barath 我想要实现的是,如果外部服务返回404,我可以从服务层返回一个值为null的Mono,可以通过main方法处理。我想我也可以抛出一个错误,但我不想这样做。 404应该在服务层处理,当找不到用户时,这是我觉得应该用if处理的应用程序逻辑,而不是通过异常处理。我将在文档中查看switfhIfEmpty。还是可行的建议?
  • @PrashantPandey 请参阅上面的评论。
  • @Trace,您的代码仍然有效,如果 404,您将返回 Mono.empty() ,它将调用 switchIfEmpty。无论如何,如果您想处理错误(如果这是您正在寻找的内容),那么您可以使用onErrorResume() 并进行适当处理,或者您也可以使用onErrorReturn()guide

标签: java spring-boot reactive-programming spring-webflux project-reactor


【解决方案1】:

这是因为 switchIfEmpty “按值”接受 Mono。这意味着即使在您订阅单声道之前,这个替代单声道的评估已经被触发。

想象一下这样的方法:

Mono<String> asyncAlternative() {
    return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }));
}

如果你这样定义你的代码:

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

无论在流构建过程中如何,它都会触发替代。为了解决这个问题,您可以使用 Mono.defer 推迟对第二个单声道的评估

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

这样它只会在请求替代时打印“Hi there”

UPD:

详细说明我的答案。您面临的问题与 Reactor 无关,而是与 Java 语言本身以及它如何解析方法参数有关。让我们检查一下我提供的第一个示例中的代码。

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

我们可以把它改写成:

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = asyncAlternative();
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

这两个代码 sn-ps 在语义上是等价的。我们可以继续拆开它们,看看问题出在哪里:

Mono<String> firstMono = Mono.just("Some payload");
CompletableFuture<String> alternativePromise = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }); // future computation already tiggered
Mono<String> alternativeMono = Mono.fromFuture(alternativePromise);
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

正如您所见,当我们开始编写 Mono 类型时,已经触发了未来的计算。为了防止不必要的计算,我们可以将未来包装到延迟评估中:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

哪一个会展开

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }))); // future computation defered
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

在第二个示例中,future 被困在一个惰性供应商中,并且仅在被请求时才被安排执行。

【讨论】:

  • 我试过这个,但是你定义的方法asyncAlternative(),尽管Mono.defer()总是被触发。
  • @Trace 你能告诉我你尝试了什么吗?因为 defer 的唯一目的是不允许在 time 之前进行评估。
  • 请查看更新后的帖子。在调试模式下,我看到 Mono.defer 在之后被调用,但这并不能消除它总是被执行,即使 this.userService.getUserByEmail(loginUser.getEmail()) 没有返回 Mono.empty()
  • It'll always trigger alternative no matter what during stream construction. 那有什么用。这是一个名为switchIfEmpty 的方法。有些事情真的没有意义。
  • 您的回答是正确的。我把它分解了,原来switchIfEmpty被触发的原因实际上是因为updateUser返回了一个带有http状态码204的空正文!我有点不情愿地修改了 api,但现在它可以正常工作了。谢谢!
【解决方案2】:

对于那些尽管回答得很好,但仍然不明白为什么会出现这种行为的人:

Reactor 源(Mono.xxx 和 Flux.xxx)是:

  • 懒惰评估:源的内容只有在订阅者订阅时才会评估/触发;

  • 热切评估:源的内容甚至在订阅者订阅之前就立即评估。

Mono.just(xxx)Flux.just(xxx)Flux.fromIterable(x,y,z) 之类的表达式是急切的。

通过使用defer(),您可以强制对源进行延迟评估。这就是接受的答案有效的原因。

这样做:

 someMethodReturningAMono()
  .switchIfEmpty(buildError());

buildError() 依赖热切的资源来创建替代 Mono 将始终在订阅之前进行评估:

Mono<String> buildError(){
       return Mono.just("An error occured!"); //<-- evaluated as soon as read
}

为防止这种情况发生,请执行以下操作:

 someMethodReturningAMono()
  .switchIfEmpty(Mono.defer(() -> buildError()));

阅读此answer 了解更多信息。

【讨论】:

    猜你喜欢
    • 2021-05-02
    • 2020-08-07
    • 1970-01-01
    • 2017-09-14
    • 2012-02-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多