【发布时间】:2021-06-04 22:14:20
【问题描述】:
我目前在一个构建微服务的项目中,并试图从更传统的 Spring Boot RestClient 迁移到使用 Netty 和 WebClient 作为 HTTP 客户端的 Reactive Stack,以便连接到后端系统。
这对于使用 REST API 的后端来说进展顺利,但是我仍然在将 WebClient 用于连接到 SOAP 后端和 Oracle 数据库的服务(仍然使用传统的 JDBC)时遇到一些困难。
我设法在网上找到了一些关于 JDBC 调用的解决方法,这些调用利用并行调度程序来发布阻塞 JDBC 调用的结果:
//the method that is called by @Service
@Override
public Mono<TransactionManagerModel> checkTransaction(String transactionId, String channel, String msisdn) {
return asyncCallable(() -> checkTransactionDB(transactionId, channel, msisdn))
.onErrorResume(error -> Mono.error(error));
}
...
//the actual JDBC call
private TransactionManagerModel checkTransactionDB(String transactionId, String channel, String msisdn) {
...
List<TransactionManagerModel> result =
jdbcTemplate.query(CHECK_TRANSACTION, paramMap, new BeanPropertyRowMapper<>(TransactionManagerModel.class));
...
}
//Generic async callable
private <T> Mono<T> asyncCallable(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(Schedulers.parallel()).publishOn(transactionManagerJdbcScheduler);
}
我认为这很有效。
对于 SOAP 调用,我所做的是将 SOAP 调用封装在 Mono 中,而 SOAP 调用本身使用的是 CloseableHttpClient,这显然是一个阻塞 HTTP 客户端。
//The method that is being 'reactive'
public Mono<OfferRs> addOffer(String transactionId, String channel, String serviceId, OfferRq request) {
...
OfferRs result = adapter.addOffer(transactionId, channel, generateRequest(request));
...
}
//The SOAP adapter that uses blocking HTTP Client
public OfferRs addOffer(String transactionId, String channel, JAXBElement<OfferRq> request) {
...
response = (OfferRs) getWebServiceTemplate().marshalSendAndReceive(url, request, webServiceMessage -> {
try {
SoapHeader soapHeader = ((SoapMessage) webServiceMessage).getSoapHeader();
ObjectFactory headerFactory = new ObjectFactory();
AuthenticationHeader authHeader = headerFactory.createAuthenticationHeader();
authHeader.setUserName(username);
authHeader.setPassWord(password);
JAXBContext headerContext = JAXBContext.newInstance(AuthenticationHeader.class);
Marshaller marshaller = headerContext.createMarshaller();
marshaller.marshal(authHeader, soapHeader.getResult());
} catch (Exception ex) {
log.error("Failed to marshall SOAP Header!", ex);
}
});
return response;
...
}
我的问题是:SOAP 调用的这种实现是否足够“反应”,以至于我不必担心某些调用在微服务的某些部分被阻塞?我已经实现了响应式堆栈 - 显式调用 block() 将引发异常,因为如果使用 Netty,则不允许这样做。
或者我是否也应该在 SOAP 调用中调整并行 Schedulers 的使用?
【问题讨论】:
-
进行反应式关系数据库调用的一个选项是 R2DBC - r2dbc.io/https://r2dbc.io 有一个等效的 spring 数据库。
-
@MichaelMcFadyen 我看过 R2DBC,但不幸的是,对 Oracle DB 驱动程序的支持......缺乏。 Oracle 响应式驱动程序需要 JDK11,而我的项目仍然停留在 8。docs.oracle.com/en/database/oracle/oracle-database/21/jjdbc/… 不过对于 SOAP,我仍然不确定
-
好点。我不知道直到 TIL 还不支持 oracle 驱动程序。
-
所有阻塞调用都应该放在它们自己的调度器上,这样它们就不会阻塞任何常规调度线程。您的soap 请求可能应该像您对jdbc 调用所做的那样完成。我唯一有意见的是
Schedulers.parallel()将在多个内核上创建工作程序,这可能是不需要的,因为有一个设置时间,并且通常只有在 CPU 密集型工作时才需要使用多个内核。第二件事是publishOn的使用。 -
所以你现在声明的是,当有人订阅时,会自动为他们分配一个随机内核上的线程,然后当我们到达
publishOn语句时,当前线程将从指定线程切换在transactionManagerJdbcScheduler中的线程的指定核心上(我不知道这是什么类型的Scheduler。我可能只是从onSubscribe上放置一个Schedulers.boundedElastic()开始,因为这个调度程序将扩大规模根据需要向下,并在 60 秒后删除未使用的线程。它有一个 cpu 核心 x 10 线程的上限。
标签: java spring spring-boot reactive-programming spring-webflux