【问题标题】:Why there are multiple calls to DB为什么多次调用 DB
【发布时间】:2022-01-20 02:59:18
【问题描述】:

我正在使用 Postgre SQL 玩 R2DBC。我正在尝试的用例是通过 ID 以及语言、演员和类别获取电影。下面是架构

这是ServiceImpl中对应的一段代码

@Override
public Mono<FilmModel> getById(Long id) { 
    Mono<Film> filmMono = filmRepository.findById(id).switchIfEmpty(Mono.error(DataFormatException::new)).subscribeOn(Schedulers.boundedElastic());
    Flux<Actor> actorFlux = filmMono.flatMapMany(this::getByActorId).subscribeOn(Schedulers.boundedElastic());
    Mono<String> language = filmMono.flatMap(film -> languageRepository.findById(film.getLanguageId())).map(Language::getName).subscribeOn(Schedulers.boundedElastic());
    Mono<String> category = filmMono.flatMap(film -> filmCategoryRepository
                    .findFirstByFilmId(film.getFilmId()))
            .flatMap(filmCategory -> categoryRepository.findById(filmCategory.getCategoryId()))
            .map(Category::getName).subscribeOn(Schedulers.boundedElastic());

    return Mono.zip(filmMono, actorFlux.collectList(), language, category)
            .map(tuple -> {
                FilmModel filmModel = GenericMapper.INSTANCE.filmToFilmModel(tuple.getT1());
                List<ActorModel> actors = tuple
                        .getT2()
                        .stream()
                        .map(act -> GenericMapper.INSTANCE.actorToActorModel(act))
                        .collect(Collectors.toList());
                filmModel.setActorModelList(actors);
                filmModel.setLanguage(tuple.getT3());
                filmModel.setCategory(tuple.getT4());
                return filmModel;
            });
         }

日志显示 4 次调用电影

2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [actor-tcp-nio-9] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-12] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.162 DEBUG 32493 --- [actor-tcp-nio-9] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT language.* FROM language WHERE language.language_id = $1 LIMIT 2]
2021-12-16 21:21:20.188 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film_actor.actor_id, film_actor.film_id, film_actor.last_update FROM film_actor WHERE film_actor.film_id = $1]
2021-12-16 21:21:20.188 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT film_category.film_id, film_category.category_id, film_category.last_update FROM film_category WHERE film_category.film_id = $1 LIMIT 1]
2021-12-16 21:21:20.313 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT category.* FROM category WHERE category.category_id = $1 LIMIT 2]
2021-12-16 21:21:20.563 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT actor.* FROM actor WHERE actor.actor_id = $1 LIMIT 2]

我不是在寻找 SQL 优化(连接等)。我绝对可以让它更高效。但问题是为什么我确实看到了对 Film 表的 4 个 SQL 查询。只是补充一下,我已经修复了代码。但无法理解核心原因。提前致谢。

【问题讨论】:

  • 这只是一般性评论,subscribeOn 会将整个订阅放在定义的调度程序上。这意味着您只需要一个。拥有多个不会有任何不同,reactor 将在组装阶段找到它找到的第一个并使用它。这在反应堆文档中有很好的介绍。因此,如果您坚持使用它,只需拥有一个并删除其余的。
  • subscribeOn 是在自己的线程中并行运行 fetch。每个都将在自己的线程中运行,而不是在单个线程中运行。
  • 请阅读我刚刚解释的内容。首先,您所拥有的所有东西都不会在parallel 中运行要并行运行,您需要ParallelFlux 您现在拥有的代码默认运行async,但您添加了onSubscribe,所以它实际运行在有人订阅时分配的单个线程上。请阅读文档projectreactor.io/docs/core/release/reference/#schedulers 你不需要它们。删除它们。不要使用平行通量,事情不会变得更快stackoverflow.com/questions/68972035/…。使用默认值
  • @Toerktumlare:感谢您的回复。在您发表评论后,我实际上删除了 onSubscribe 的代码并尝试了(以及 Sam Hughes 建议的 defer 组合,但没有它)。我看到的是代码正在不同的线程上执行(您可以查看日志)。我最初的假设是这是因为我将它安排在不同的线程上。但即使在删除它之后..它仍然安排在不同的线程上。这让我有点困惑。我能想到的唯一原因是..r2dbc 实际上是在不同的线程上调度它们。
  • 不,因为反应堆就是这样工作的,任何线程都可以随时做任何事情。 Reactor 将尝试在底层使用尽可能多的线程。这就是使用响应式的全部意义,这在文档中有所介绍。请阅读它们,它们会教你所有这些东西

标签: spring-webflux project-reactor spring-data-r2dbc r2dbc r2dbc-postgresql


【解决方案1】:

为什么我确实看到了对 Film 表的 4 个 SQL 查询。

原因很简单。您正在订阅Mono&lt;Film&gt; 4 次:

Mono<Film> filmMono = filmRepository.findById(id);

Flux<Actor> actorFlux = filmMono.flatMapMany(...); (1)
Mono<String> language = filmMono.flatMap(...); (2)
Mono<String> category = filmMono.flatMap(...); (3)
Mono.zip(filmMono, actorFlux.collectList(), language, category) (4)

每次订阅filmMono 都会触发一个新查询。请注意,您可以通过使用Mono#cache 运算符将filmMono 转换为热源并为所有四个订阅者缓存结果来更改它。

【讨论】:

  • 感谢您的回答和解决方案。这解决了这个问题。就线程调度而言,我确实看到了一些不同的东西。但我想我会创建一个关于线程调度的单独帖子。谢谢你今天教我一个新的操作。让我开心。
【解决方案2】:

我对你的堆栈不是很熟悉,所以这是一个高层次的回答你的“为什么”。稍后会有更具体的答案为您提供(例如,可以确认this thread 是否相关的人)。

虽然我不是 Spring Daisy(或 Spring 开发人员),但您将表达式绑定到 filmMono,解析为查询 select film.* from film....。您在不同的上下文中引用了该表达式四次,它被解析了四次。语句的排序可能是 lib 作者懒惰地评估您在本地绑定的表达式的部分成功尝试,以便它能够批处理四个意外相同的查询。您很可能通过收集到一个实际容器中来解决此问题,然后映射到该容器而不是绑定到 filmMono 的表达式。

一般来说,这种情况是因为当语言本身不支持惰性求值时,库作者可用的选项并不好。因为任何操作都可能改变数据集,所以库作者必须在以下选项中做出选择:

  • A,构建足够的脚手架来完全记录所需的所有资源,复制数据集以进行任何需要以某种方式改变记录的操作,并希望他们能够检测到在解析数据集时可能泄漏脚手架的任何边缘情况意料之中(要做到这一点……很难)。
  • B,将每个级别的映射解析为一个查询,针对它出现的每个上下文,以免任何操作以可能使集成者(例如您)感到惊讶的方式改变数据集。
  • C,如上所述,除了不复制原始请求,只需复制数据...在每一步。 Pass-by-copy 在 JVM 上变得非常痛苦,而且像 Clojure 和 Scala 这样的语言只需让开发人员非常具体地确定他们是想要就地变异还是复制然后变异。

在您的情况下,B 对编写该库的人来说是最有意义的。事实上,他们显然与 A 足够接近,以至于他们能够批处理通过解析绑定到 filmMono 的表达式(它们只是偶然相同)产生的所有查询,所以让我印象深刻。

可以重写许多访问模式以优化结果查询。您的里程数可能会有所不同……非常不同。熟悉原始 SQL,或者像 GraphQL 这样的专用语言,可以提供比关系映射器更一致的结果,但我更加欣赏良好的 IDE 支持,并且像这样混合域通常意味着放弃自动完成、上下文突出显示、语言服务器解决方案证明和 linting。

鉴于问题的范围是“为什么会发生这种情况?”,即使注意到我对您的堆栈不熟悉,答案是“在本机不支持它的语言中进行惰性评估真的很难。”

【讨论】:

  • 感谢您的解释。我知道懒惰和急切的评价。我尝试使用 defer 运算符更改代码并删除每个订阅的订阅,但行为是相同的。
  • 你搞砸了 switchIfEmpty() 吗?这里不是我的域,但链接的线程看起来很相关。具体测试如下: 1. 通过首先将该表达式的结果收集到容器中来确认它是从该行的构造方式得出的。 2. 定义一个具有成功属性和错误属性的类,具有初始警卫值,以及替换警卫值并随后忽略其参数的方法,仅返回解析的值,视情况而定成功或错误。初始化并将该方法传递给 switchIfEmpty()。也就是上面的选项C,但是对于这一步
  • 我将接受 lkatiforis 的回答。但是感谢您以更通用的方式解释这个概念。
  • @harry,哦,哎呀。我也只是赞成他的回答。
猜你喜欢
  • 2015-01-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-09-01
相关资源
最近更新 更多