【问题标题】:Using reactive(r2dbc) batch with transaction将反应式(r2dbc)批处理与事务一起使用
【发布时间】:2021-03-10 15:56:42
【问题描述】:

您好,我有一个非常重要的问题。 我正在尝试使用反应式 r2dbc 创建一个批处理并使用事务性来注释该方法。 但看起来如果我同时使用事务代码和批处理代码,代码就会挂起并且不起作用。 下面是代码

    @Transactional /* **Transaction** */
    @GetMapping("/batchFetchData")
    public Flux<Object> batchFetch() {
        long startTime = System.currentTimeMillis();
        Mono.from(databaseConfiguration.connectionFactory().create())
                .flatMapMany(connection -> Flux.from(connection
                        .createBatch() /* **Creating batch***/
                        .add("SELECT * FROM xtable where xId = 232323")
                        .add("SELECT * FROM ytable where yId = 454545")
                        .add("SELECT * FROM ztable where zId = 676767")
                        //.execute()));  /* **Execution batch***/
                        .execute())).as(StepVerifier::create)
                .expectNextCount(3) /* **Expect count batch***/
                .verifyComplete();  /* **Verify batch***/

        LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
    return null;
    }

【问题讨论】:

  • 在您订阅之前什么都不会发生。您忽略了单声道的返回。将其更改为 return Mono.from( .... ) 您正在通过不返回来破坏反应链,因此客户端无法订阅。
  • 我不确定这有什么关系。问题是当我使用评论(验证完成)时,事务就会挂起。毕竟它只是批处理,预计不会返回任何东西

标签: java spring-data spring-webflux spring-data-r2dbc r2dbc


【解决方案1】:

你正在打破反应链。

在反应式编程中在您订阅之前不会发生任何事情

那是什么意思,我可以举个小例子。

// If running this, nothing happens
Mono.just("Foobar");

同时:

Mono.just("Foobar").subscribe(s -> System.out.println(s));

将打印:

Foobar

如果你有一个函数,这也适用

public void getString() {
    Mono.just("Foobar");
}

// Nothing happens, you have declared something 
// but it will never get run, no one is subscribing
getString();

你需要做什么:

public Mono<String> getString() {
    // This could be saving to a database or anything, this will now get run
    return Mono.just("Now this code will get run");
}

// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));

那么发生了什么?好吧,在响应式编程中,一旦有人订阅了 Mono 或 Flux,程序就会向上遍历并构建一个回调链,直到找到开始产生值的生产者(在我的例子中是 just 语句)。这个阶段称为“组装阶段”。当这个阶段完成后,反应链将开始为订阅者产生价值。

如果没有人订阅,则不会建立链。

那么谁是订阅者?它通常是价值的最终消费者。例如,发起调用的网页或移动应用程序,但如果它是发起调用的那个(例如 cron 作业),也可以是您的 Spring Boot 服务。

让我们看看你的代码:

@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
    long startTime = System.currentTimeMillis();

    // Here you declare a Mono but ignoring the return type so breaking the reactive chain
    Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch() /* **Creating batch***/
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    //.execute()));  /* **Execution batch***/
                    .execute())).as(StepVerifier::create)
            .expectNextCount(3) /* **Expect count batch***/
            .verifyComplete();  /* **Verify batch***/
            // Here at the end you have no subscriber

    LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));

    // Null is not allowed in reactive chains
    return null;
}

那你是怎么解决的呢?

你不需要打破反应链。这是基本的反应式编程。

@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
    long startTime = System.currentTimeMillis();

    // we return here so that the calling client 
    // can subscribe and start the chain
    return Mono.from(databaseConfiguration.connectionFactory().create()) 
            .flatMapMany(connection -> Flux.from(connection
                    .createBatch()
                    .add("SELECT * FROM xtable where xId = 232323")
                    .add("SELECT * FROM ytable where yId = 454545")
                    .add("SELECT * FROM ztable where zId = 676767")
                    .execute()))
                    .then(); 
                    // then() statement throws away whatever the return 
                    // value is and just signals to the calling client 
                    // when everything is done.       
}

“我不想退货”

这就是Mono#then 语句的用途。您会看到,当链中的每个部分完成时,它会发出完成信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,并传递值等。当我们到达 then 语句时,它只会发出信号COMPLETE 并且不返回任何内容(或者实际上它正在返回 Mono&lt;Void&gt;,因为在反应链中不允许 null )。您必须始终返回,以便每一步都可以传递其 COMPLETE 信号。

我还删除了您在代码中使用的 StepVerifier,因为它通常用于验证单元测试中的步骤,而不是在生产代码中使用。你可以在这里StepVerifier阅读更多信息。

如果你想学习反应式编程,我建议你这样做,因为它很棒而且我喜欢它,我强烈建议你阅读优秀的反应器文档Introduction to reactive programming,他们将解释什么都不会发生的概念直到你订阅等等。

【讨论】:

    【解决方案2】:

    你的问题是:

    return null;
    

    您应该在响应式应用程序中返回 Mono/Flux,即使流中没有项目,也应该返回 Mono.emtpy

    查看我的示例insert Multi records

    而测试,使用StepVerify to verify the result

    对于 WebFlux 应用程序中的事务支持,您必须阅读相关文档以检查它是否支持一般本地事务或使用它时的限制。

    如果支持良好,有两种使用事务的方法。

    1. 注入一个TransactionalOperator(类似于传统的TransactionTemplate)来封装你的业务逻辑。
    2. 一般对类或方法应用@Transaction 注释。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-03-17
      • 1970-01-01
      • 1970-01-01
      • 2013-04-10
      • 2017-12-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多