【问题标题】:What's the correct way to use webflux and reactorwebflux和reactor的正确使用方法是什么
【发布时间】:2020-10-30 17:13:53
【问题描述】:

我正在学习 webflux 和 reactor。得到以下三种测试方法。 “documentOperations.findById”和“documentOperations.delete”是两个数据库操作。我知道 test1 不好,因为两个数据库操作放在一个异步方法中。我的问题是:
test2 和 test3 对系统性能的影响是否相同?或者换句话说,哪个更好?

private Mono<ServerResponse> test1(ServerRequest request, Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> {
                Document document = documentOperations.findById(doc.getId());
                documentOperations.delete(document.getId());
                return ServerResponse.noContent().build();
            });
}

private Mono<ServerResponse> test2(ServerRequest request, Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> {
                return Mono.just(documentOperations.findById(doc.getId()))
                        .flatMap(document -> {
                            documentOperations.delete(document.getId());
                            return ServerResponse.noContent().build();
                        });
            });
}

private Mono<ServerResponse> test3(ServerRequest request, Contexts contexts) {
    return request.body(bodyDocumentExtractor)
            .flatMap(doc -> {
                return Mono.just(documentOperations.findById(doc.getId()));
            }).flatMap(document -> {
                documentOperations.delete(document.getId());
                return ServerResponse.noContent().build();
            });
}

【问题讨论】:

  • 我假设documentOperations 上的操作与一些外部数据源交互。因此,这两种方法都应返回包装在 Mono 中的结果。
  • @Michael Xin 在使用 Webflux 时最好使用响应式数据库客户端。每当您需要连接到外部详细信息(例如 dbs、http 资源)时,请始终使用响应式客户端。对于 dbs checkout ReactiveMongoRepository,对于 http 调用 checkout WebClient。如果您找不到这样的响应式客户端,请查看用于处理阻塞行为的 spring 准则。 docs.spring.io/spring-framework/docs/current/reference/html/…
  • 您的数据库返回的具体值直接告诉我它们正在阻塞。所以以上都不好,都是坏的。

标签: java mono spring-webflux reactor


【解决方案1】:

上面的例子都不好。您所有的数据库调用都返回具体类型,这意味着它们都是阻塞调用。

// returns the concrete type
// thread does the call, needs to wait until we get the value (document)
Document document = documentOperations.findById("1");

如果它是非阻塞的,则返回 Mono&lt;T&gt;Flux&lt;T&gt;

// Returns a mono, so we know it's not blocking. 
// We can chain on actions with for example flatMap etc.
Mono<Document> document = documentOperations.findById("1");

如果您必须使用阻塞数据库,例如 oracle 数据库等。您需要将它放在它自己的整个线程上,这样它就不会阻塞任何主工作线程。这可以通过调度程序来完成。所以在这个例子中,当客户端订阅时,它将被放置在一个单独的线程中。

Mono<Document> document = Mono.fromCallable(() -> documentOperations.findById("1"))
    .subscribeOn(Schedulers.boundedElastic());;

所以对于你的例子:

private Mono<ServerResponse> test3(ServerRequest request, Contexts contexts) {
    return request.body(bodyDocumentExtractor)
        .flatMap(doc -> Mono.fromCallable(() -> documentOperations.findById(doc.getId()))
        .flatMap(document -> Mono.fromCallable(() -> documentOperations.delete(document.getId()))
                .then(ServerResponse.noContent().build());
        ).subscribeOn(Schedulers.boundedElastic());
}

Reactor documentation - Wrap blocking calls

【讨论】:

    【解决方案2】:

    基于DocumentOperations 是反应性的假设,我希望代码看起来与下面的代码行类似。

    private Mono<ServerResponse> test3(ServerRequest request, Contexts contexts) {
        return request.body(bodyDocumentExtractor)
                .flatMap(doc -> documentOperations.findById(doc.getId()))
                .flatMap(document -> documentOperations.delete(document.getId()))
                .then(ServerResponse.noContent().build());
    }
    

    【讨论】:

      猜你喜欢
      • 2017-09-20
      • 2019-05-17
      • 2014-12-08
      • 1970-01-01
      • 2023-03-15
      • 2011-11-11
      • 2015-03-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多