【问题标题】:How to handle exception throw in Flux.fromStream如何处理 Flux.fromStream 中的异常抛出
【发布时间】:2021-12-22 14:16:20
【问题描述】:
    public Mono<ResponseEntity<Data>> getData(@RequestParam List<String> tagIds){
   
   Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
           .parallel()
           .runOn(Schedulers.boundedElastic())
           .flatMap(id -> fetchResources(id)) //S3Exception is thrown here 
           .flatMap(idS3Object -> Mono.just(s3Object))
           .doOnError((throwable) -> log.error(throwable))
           .ordered((u1, u2) -> u2.hashCode() - u1.hashCode());

   Mono<Data> data = s3ObjectFlux.collectList()
           .map(s3Objects -> new Data(s3Objects));  
}

我正在遍历 tagIds 并在此处获取 s3 对象,如果该对象不存在或引发任何异常,我想记录并忽略它并继续下一步。但是在这种情况下,如果在迭代时从 fetchResource 方法中抛出 S3Exception,则会将错误作为 500 抛出给用户,而不是我想要空列表。

我没有看到其他选项,例如 onErrorMap 或 onErrorReturn

【问题讨论】:

  • 我看到.flatMap(idS3Object -&gt; Mono.just(s3Object)) 不是必需的,如果省略,错误也会得到正确处理。我可以知道你为什么需要这一步吗?

标签: java reactive flux


【解决方案1】:

你会想要使用类似onErrorContinue的东西。

另外,这个.ordered((u1, u2) -&gt; u2.hashCode() - u1.hashCode()) 并没有什么意义,您可以使用sequential 之类的东西来合并“rails”。

另外,我不太明白这个:.flatMap(idS3Object -&gt; Mono.just(s3Object))。首先,它不能编译,可能你有错字,你想要一些东西flatMap(idS3Object -&gt; Mono.just(s3Object))。在这种形式下,从一个对象创建一个Mono 并对其执行flatMap 完全没有意义。

把所有这些放在一起,我假设你想要这样的东西:

Flux<S3Object> s3ObjectFlux = Flux.fromStream(tagIds.stream())
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .flatMap(this::fetchResources)
        .sequential()
        .onErrorContinue((error, object) -> {
            log.error(error);
        });

【讨论】:

  • 谢谢@Ervin Szilagyi,但我不想只记录错误(我可以用doOnError来做,我想抛出不同的错误或只返回mono.empty
猜你喜欢
  • 2013-03-05
  • 1970-01-01
  • 2019-03-20
  • 1970-01-01
  • 1970-01-01
  • 2013-07-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多