【问题标题】:Sending JMS messages in a Spring WebFlux reactive handler: is it blocking?在 Spring WebFlux 反应式处理程序中发送 JMS 消息:是否阻塞?
【发布时间】:2018-09-29 15:13:34
【问题描述】:

这是被动处理的正确方法吗?我看到 2 个线程一个反应性 nio,直到并包括 flatMap(fareRepo::save)。另一个线程是计算线程,它从发送消息开始一直持续到 ServerResponse.build()。我的问题是这种响应式处理请求的正确方法吗?注意:fareRepo 是响应式 couchbase repo。 谢谢

return request.bodyToMono(Fare.class).flatMap(fareRepo::save).flatMap(fs -> {
            logger.info("sending message: {}, to queue", fs.getId());
            jmsTemplate.send("fare-request-queue", (session) -> session.createTextMessage(fs.getId()));
            return Mono.just(fs);
        }).flatMap(fi -> ServerResponse.created(URI.create("/fare/" + fi.getId())).build());

【问题讨论】:

  • 你的问题是题外话,因为它主要是基于意见的,但这个fareRepo::save 可能是同步操作,所以你有点混合非阻塞的东西和阻塞的东西。我会说这不是一个好习惯。
  • @john ,忘了说 fareRepo 也是反应式的。
  • @jzqa 不需要提及,因为您正在调用flatMap :)

标签: spring reactive-programming spring-webflux


【解决方案1】:

Spring JmsTemplate 会阻塞你的请求线程,这不利于响应式设计编码。您可以尝试使用 .publishOn(Schedulers.elastic()) 它将创建新线程并执行代码而不会阻塞请求线程。因为它是 I/O 绑定操作,所以使用 Schedulers.elastic()

return request.bodyToMono(Fare.class).flatMap(fareRepo::save)
.publishOn(Schedulers.elastic())
.flatMap(fs -> {
            logger.info("sending message: {}, to queue", fs.getId());
            jmsTemplate.send("fare-request-queue", (session) -> session.createTextMessage(fs.getId()));
            return Mono.just(fs);
        }).flatMap(fi -> ServerResponse.created(URI.create("/fare/" + fi.getId())).build());

【讨论】:

    【解决方案2】:

    我假设您正在使用 Spring Framework 的 JmsTemplate 实现,它是阻塞的。

    如果没有更多上下文,我们只能假设您在反应式运算符中间有一个阻塞操作,这将导致您的应用程序出现问题。

    【讨论】:

    • 假设是这种情况,我发现一个(不好的)替代方法是在反应流的末尾调用.block(),但这当然违背了使用 WebFlux 的目的
    猜你喜欢
    • 2021-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-06-08
    • 2018-06-06
    • 2013-06-25
    • 2021-11-16
    • 1970-01-01
    相关资源
    最近更新 更多