你正在打破反应链。
在反应式编程中在您订阅之前不会发生任何事情。
那是什么意思,我可以举个小例子。
// If running this, nothing happens
Mono.just("Foobar");
同时:
Mono.just("Foobar").subscribe(s -> System.out.println(s));
将打印:
Foobar
如果你有一个函数,这也适用
public void getString() {
Mono.just("Foobar");
}
// Nothing happens, you have declared something
// but it will never get run, no one is subscribing
getString();
你需要做什么:
public Mono<String> getString() {
// This could be saving to a database or anything, this will now get run
return Mono.just("Now this code will get run");
}
// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));
那么发生了什么?好吧,在响应式编程中,一旦有人订阅了 Mono 或 Flux,程序就会向上遍历并构建一个回调链,直到找到开始产生值的生产者(在我的例子中是 just 语句)。这个阶段称为“组装阶段”。当这个阶段完成后,反应链将开始为订阅者产生价值。
如果没有人订阅,则不会建立链。
那么谁是订阅者?它通常是价值的最终消费者。例如,发起调用的网页或移动应用程序,但如果它是发起调用的那个(例如 cron 作业),也可以是您的 Spring Boot 服务。
让我们看看你的代码:
@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
// Here you declare a Mono but ignoring the return type so breaking the reactive chain
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /* **Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /* **Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /* **Expect count batch***/
.verifyComplete(); /* **Verify batch***/
// Here at the end you have no subscriber
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
// Null is not allowed in reactive chains
return null;
}
那你是怎么解决的呢?
你不需要打破反应链。这是基本的反应式编程。
@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
long startTime = System.currentTimeMillis();
// we return here so that the calling client
// can subscribe and start the chain
return Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch()
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
.execute()))
.then();
// then() statement throws away whatever the return
// value is and just signals to the calling client
// when everything is done.
}
“我不想退货”
这就是Mono#then 语句的用途。您会看到,当链中的每个部分完成时,它会发出完成信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,并传递值等。当我们到达 then 语句时,它只会发出信号COMPLETE 并且不返回任何内容(或者实际上它正在返回 Mono<Void>,因为在反应链中不允许 null )。您必须始终返回,以便每一步都可以传递其 COMPLETE 信号。
我还删除了您在代码中使用的 StepVerifier,因为它通常用于验证单元测试中的步骤,而不是在生产代码中使用。你可以在这里StepVerifier阅读更多信息。
如果你想学习反应式编程,我建议你这样做,因为它很棒而且我喜欢它,我强烈建议你阅读优秀的反应器文档Introduction to reactive programming,他们将解释什么都不会发生的概念直到你订阅等等。