【发布时间】:2022-01-20 02:59:18
【问题描述】:
我正在使用 Postgre SQL 玩 R2DBC。我正在尝试的用例是通过 ID 以及语言、演员和类别获取电影。下面是架构
这是ServiceImpl中对应的一段代码
@Override
public Mono<FilmModel> getById(Long id) {
Mono<Film> filmMono = filmRepository.findById(id).switchIfEmpty(Mono.error(DataFormatException::new)).subscribeOn(Schedulers.boundedElastic());
Flux<Actor> actorFlux = filmMono.flatMapMany(this::getByActorId).subscribeOn(Schedulers.boundedElastic());
Mono<String> language = filmMono.flatMap(film -> languageRepository.findById(film.getLanguageId())).map(Language::getName).subscribeOn(Schedulers.boundedElastic());
Mono<String> category = filmMono.flatMap(film -> filmCategoryRepository
.findFirstByFilmId(film.getFilmId()))
.flatMap(filmCategory -> categoryRepository.findById(filmCategory.getCategoryId()))
.map(Category::getName).subscribeOn(Schedulers.boundedElastic());
return Mono.zip(filmMono, actorFlux.collectList(), language, category)
.map(tuple -> {
FilmModel filmModel = GenericMapper.INSTANCE.filmToFilmModel(tuple.getT1());
List<ActorModel> actors = tuple
.getT2()
.stream()
.map(act -> GenericMapper.INSTANCE.actorToActorModel(act))
.collect(Collectors.toList());
filmModel.setActorModelList(actors);
filmModel.setLanguage(tuple.getT3());
filmModel.setCategory(tuple.getT4());
return filmModel;
});
}
日志显示 4 次调用电影
2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [actor-tcp-nio-9] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [ctor-tcp-nio-12] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.026 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT film.* FROM film WHERE film.film_id = $1 LIMIT 2]
2021-12-16 21:21:20.162 DEBUG 32493 --- [actor-tcp-nio-9] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT language.* FROM language WHERE language.language_id = $1 LIMIT 2]
2021-12-16 21:21:20.188 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT film_actor.actor_id, film_actor.film_id, film_actor.last_update FROM film_actor WHERE film_actor.film_id = $1]
2021-12-16 21:21:20.188 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT film_category.film_id, film_category.category_id, film_category.last_update FROM film_category WHERE film_category.film_id = $1 LIMIT 1]
2021-12-16 21:21:20.313 DEBUG 32493 --- [ctor-tcp-nio-10] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT category.* FROM category WHERE category.category_id = $1 LIMIT 2]
2021-12-16 21:21:20.563 DEBUG 32493 --- [actor-tcp-nio-7] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT actor.* FROM actor WHERE actor.actor_id = $1 LIMIT 2]
我不是在寻找 SQL 优化(连接等)。我绝对可以让它更高效。但问题是为什么我确实看到了对 Film 表的 4 个 SQL 查询。只是补充一下,我已经修复了代码。但无法理解核心原因。提前致谢。
【问题讨论】:
-
这只是一般性评论,
subscribeOn会将整个订阅放在定义的调度程序上。这意味着您只需要一个。拥有多个不会有任何不同,reactor 将在组装阶段找到它找到的第一个并使用它。这在反应堆文档中有很好的介绍。因此,如果您坚持使用它,只需拥有一个并删除其余的。 -
subscribeOn 是在自己的线程中并行运行 fetch。每个都将在自己的线程中运行,而不是在单个线程中运行。
-
请阅读我刚刚解释的内容。首先,您所拥有的所有东西都不会在
parallel中运行要并行运行,您需要ParallelFlux您现在拥有的代码默认运行async,但您添加了onSubscribe,所以它实际运行在有人订阅时分配的单个线程上。请阅读文档projectreactor.io/docs/core/release/reference/#schedulers 你不需要它们。删除它们。不要使用平行通量,事情不会变得更快stackoverflow.com/questions/68972035/…。使用默认值 -
@Toerktumlare:感谢您的回复。在您发表评论后,我实际上删除了 onSubscribe 的代码并尝试了(以及 Sam Hughes 建议的 defer 组合,但没有它)。我看到的是代码正在不同的线程上执行(您可以查看日志)。我最初的假设是这是因为我将它安排在不同的线程上。但即使在删除它之后..它仍然安排在不同的线程上。这让我有点困惑。我能想到的唯一原因是..r2dbc 实际上是在不同的线程上调度它们。
-
不,因为反应堆就是这样工作的,任何线程都可以随时做任何事情。 Reactor 将尝试在底层使用尽可能多的线程。这就是使用响应式的全部意义,这在文档中有所介绍。请阅读它们,它们会教你所有这些东西
标签: spring-webflux project-reactor spring-data-r2dbc r2dbc r2dbc-postgresql