【问题标题】:Spring Webflux using a blocking HttpClient in a Reactive StackSpring Webflux 在反应式堆栈中使用阻塞 HttpClient
【发布时间】:2021-06-04 22:14:20
【问题描述】:

我目前在一个构建微服务的项目中,并试图从更传统的 Spring Boot RestClient 迁移到使用 Netty 和 WebClient 作为 HTTP 客户端的 Reactive Stack,以便连接到后端系统。

这对于使用 REST API 的后端来说进展顺利,但是我仍然在将 WebClient 用于连接到 SOAP 后端和 Oracle 数据库的服务(仍然使用传统的 JDBC)时遇到一些困难。

我设法在网上找到了一些关于 JDBC 调用的解决方法,这些调用利用并行调度程序来发布阻塞 JDBC 调用的结果:

//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
    return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
            .onErrorResume(error -> Mono.error(error));
}

...

//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
    List<TransactionManagerModel> result = 
                    jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}

//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
    return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}

我认为这很有效。

对于 SOAP 调用,我所做的是将 SOAP 调用封装在 Mono 中,而 SOAP 调用本身使用的是 CloseableHttpClient,这显然是一个阻塞 HTTP 客户端。

//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
    OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}

//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
    response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
            try {
                SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();
                    
                ObjectFactory headerFactory = new ObjectFactory();
                AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
                authHeader.setUserName(username);
                authHeader.setPassWord(password);
                    
                JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
                Marshaller marshaller = headerContext.createMarshaller();
                marshaller.marshal(authHeader, soapHeader.getResult());
            } catch (Exception ex) {
                log.error("Failed to marshall SOAP Header!", ex);
            }
        });
        return response;
...
}

我的问题是:SOAP 调用的这种实现是否足够“反应”,以至于我不必担心某些调用在微服务的某些部分被阻塞?我已经实现了响应式堆栈 - 显式调用 block() 将引发异常,因为如果使用 Netty,则不允许这样做。

或者我是否也应该在 SOAP 调用中调整并行 Schedulers 的使用?

【问题讨论】:

  • 进行反应式关系数据库调用的一个选项是 R2DBC - r2dbc.io/https://r2dbc.io 有一个等效的 spring 数据库。
  • @MichaelMcFadyen 我看过 R2DBC,但不幸的是,对 Oracle DB 驱动程序的支持......缺乏。 Oracle 响应式驱动程序需要 JDK11,而我的项目仍然停留在 8。docs.oracle.com/en/database/oracle/oracle-database/21/jjdbc/… 不过对于 SOAP,我仍然不确定
  • 好点。我不知道直到 TIL 还不支持 oracle 驱动程序。
  • 所有阻塞调用都应该放在它们自己的调度器上,这样它们就不会阻塞任何常规调度线程。您的soap 请求可能应该像您对jdbc 调用所做的那样完成。我唯一有意见的是Schedulers.parallel() 将在多个内核上创建工作程序,这可能是不需要的,因为有一个设置时间,并且通常只有在 CPU 密集型工作时才需要使用多个内核。第二件事是publishOn的使用。
  • 所以你现在声明的是,当有人订阅时,会自动为他们分配一个随机内核上的线程,然后当我们到达publishOn 语句时,当前线程将从指定线程切换在transactionManagerJdbcScheduler 中的线程的指定核心上(我不知道这是什么类型的Scheduler。我可能只是从onSubscribe 上放置一个Schedulers.boundedElastic() 开始,因为这个调度程序将扩大规模根据需要向下,并在 60 秒后删除未使用的线程。它有一个 cpu 核心 x 10 线程的上限。

标签: java spring spring-boot reactive-programming spring-webflux


【解决方案1】:

经过一些讨论,我会写一个答案。

Reactor 文档指出you should place blocking calls on their own schedulers。这基本上是为了保持反应器的非阻塞部分继续运行,如果有东西进入那个阻塞,那么反应器将回退到传统的 servlet 行为,这意味着为每个请求分配一个线程。

Reactor 有关于 schedulers 他们的类型等的非常好的文档。

但很短:

订阅

当有人订阅时,reactor 将进入一个名为 assembly phase 的东西,这意味着它基本上会从订阅点开始向后上游调用操作员,直到找到数据的生产者(例如数据库或其他服务等) .如果它在这个阶段的某个地方找到onSubscribe-operator,它将把整个链放在它自己定义的Scheduler 上。所以要知道的一件好事是onSubscribe 的位置并不重要,只要在assembly phase 期间发现它,整个链都会受到影响。

示例用法可能是:

我们有对数据库的阻塞调用,使用阻塞休息客户端的慢速调用,在阻塞庄园中从系统读取文件等。

onPublish

如果您在assembly phase 期间在链中的某处有onPublish,则链将知道它的放置位置,链将从默认调度程序切换到该特定点的指定调度程序。所以onPublish 位置很重要。因为它将在放置的位置切换。这个操作符更多的是控制你想在代码中的特定点将某些东西放在特定的调度程序上。

示例用法可能是:

您正在特定点进行一些繁重的阻塞 cpu 计算,您可以切换到 Scheduler.parallell(),这将保证所有计算都将放在单独的核心上做繁重的 cpu 工作,完成后您可以切换返回默认调度程序。

上例

你的肥皂调用应该放在自己的Scheduler 上,如果它们被阻塞,我认为onSubscribe 就足够了,使用Schedulers.elasticBound() 就可以得到传统的servlet 行为。如果您害怕在同一个调度程序上进行每个阻塞调用,您可以在 asyncCallable 函数中传递 Scheduler 并拆分调用以使用不同的 Schedulers

【讨论】:

  • 谢谢!在这个讨论之后,我也可以对 JDBC 调用进行更多调整。很高兴能够了解有关响应式的更多信息。
猜你喜欢
  • 1970-01-01
  • 2018-09-29
  • 2019-07-18
  • 1970-01-01
  • 2022-08-22
  • 1970-01-01
  • 2018-02-08
  • 2020-10-09
  • 2019-04-12
相关资源
最近更新 更多