【问题标题】:Camel JMS Asynchronous Request ReplyCamel JMS 异步请求回复
【发布时间】:2017-11-16 11:19:48
【问题描述】:

我正在尝试实现从远程系统队列 (System.A.out) 中读取 request 消息的 Camel Route 该路由查看消息正文并将其动态路由到其他系统queue (System.B.in) 然后这条 Route 完成,并等待来自队列的下一条消息(当前它阻塞并等待临时队列上的响应)

System.B 读取其输入队列(System.B.in,并不总是骆驼路由)处理消息并在其输出队列(System.B.out)系统上丢弃 响应。 B 使用 Request 消息中的 JMSMessageID 作为其 response 中的 JMSCorrelationID,这就是它从请求中保留的全部内容。

骆驼路由(类似于 System.A.out,但在 System.B.out 上侦听)获取响应消息并使用 JMSCorrelationID(请求将没有 JMSCorrelationID,因此将通过消息路由body) 找到请求的 JMSReplyTo 队列 (System.A.in) 并将 System.A 上的响应丢弃在队列中以供 System.A 处理。

我使用的是SpringBoot和Camel 2.18.3,消息队列是IMB MQ版本8

我的路线如下所示:

@Override
public void configure() throws Exception {

    //@formatter:off
    Predicate validRoute = header("route-valid").isEqualTo(true);
    Predicate inValidRoute = header("route-valid").isEqualTo(false);
    Predicate splitRoute = header("route-split").isEqualTo(true);
    Predicate singleRoute = header("route-split").isEqualTo(false);
    Predicate validSplitRoute = PredicateBuilder.and(validRoute, splitRoute);
    Predicate validSingelRoute = PredicateBuilder.and(validRoute, singleRoute);

    from(endpoint(incomingURI)).routeId(routeId)
        .process(exchange -> {
                exchange.getIn().setHeader("route-source", format("%s-%s", incomingURI, routeId));
            })
            .to(endpoint(format("bean:evaluateIncomingMessageService?method=routeMessage(*, %s)", replyToURI)))
            .choice()
                .when(validSingelRoute)
                    .log(DEBUG, "Creating a Single route")
                    .to(endpoint("bean:messageCoalitionService?method=saveInstruction(*)"))
                    .setExchangePattern(ExchangePattern.InOut)
                    .toD("${header.route-recipients}")
                .when(inValidRoute)
                    .log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
                    .to(endpoint(deadLetterURI))
                    .routeId(format("%s-%s", incomingURI, routeId))
                .when(validSplitRoute)
                    .log(DEBUG, "Creating a Split route")
                    .to(endpoint("bean:messageCoalitionService?method=saveInstructions(*)"))
                    .setExchangePattern(ExchangePattern.InOut)
                    .multicast()
                    .toD("${header.route-recipients}").endChoice()
                .otherwise()
                    .log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
                    .to(endpoint(deadLetterURI))
                    .routeId(format("%s-%s", incomingURI, routeId));

Spring Bean evaluateIncomingMessageService 决定消息是Request(无关联ID)还是Response,并为请求设置路由标头。我希望 Camel 会自动将响应路由到 Request.JMSReplyTo 队列,如果不是,如何做到这一点?

replyToURI 在 Camel Route 构建器中配置,如果路由在 System.A.out 上侦听,则它的 replyToURI 将始终为 System.A.in。

evaluateIncomingMessageService.routeMessage 看起来像这样:

 public void routeMessage(final Exchange exchange, final String replyToURI) {
    String correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);

    if (correlationId != null) {
        log.debug("Processing Message Response with JMSCorrelationID [{}]", correlationId);
        exchange.getIn().setHeader("JMSReplyTo", replyToURI);
    } else {
        // Request Messages have nave NO correlationId
        log.debug("Processing Message Request with MessageID [{}] and JMSMessageID: [{}]",
                exchange.getIn().getMessageId(),
                exchange.getIn().getHeader("JMSMessageID") != null ? exchange.getIn().getHeader("JMSMessageID").toString() : exchange.getIn().getMessageId());
        String message = exchange.getIn().getBody(String.class);
        Set<ContentBasedRoute> validRoutes = contentBasedRouting
                .stream().filter(
                        routeEntity -> Pattern.compile(
                                routeEntity.getRegularExpression(), DOTALL).matcher(message).matches()).collect(Collectors.toSet());

        if (validRoutes.isEmpty()) {
            log.warn("No valid routes found for message: [{}] ", message);
            exchange.getIn().setHeader("route-valid", false);

        } else {
            HashMap<String, ContentBasedRoute> uniqueRoutes = new HashMap<>();
            validRoutes.stream().forEach(route -> uniqueRoutes.putIfAbsent(route.getDestination(), route));

            exchange.getIn().setHeader("route-valid", true);
            exchange.getIn().setHeader("route-count", uniqueRoutes.size());
            exchange.getIn().setHeader("JMSReplyTo", replyToURI);
            //if (exchange.getIn().getHeader("JMSMessageID") == null) {
             //   exchange.getIn().setHeader("JMSMessageID", exchange.getIn().getMessageId());
            //}
            if (uniqueRoutes.size() > 1) {
                log.debug("Building a split route");
                StringBuilder routes = new StringBuilder();
                StringBuilder routeIds = new StringBuilder();
                StringBuilder routeRegex = new StringBuilder();
                uniqueRoutes.keySet().stream().forEach(i -> routes.append(i).append(","));
                uniqueRoutes.values().stream().forEach(j -> routeIds.append(j.getRouteId()).append(","));
                uniqueRoutes.values().stream().forEach(k -> routeRegex.append(k.getRegularExpression()).append(","));
                routes.deleteCharAt(routes.length() - 1);
                routeIds.deleteCharAt(routeIds.length() - 1);
                routeRegex.deleteCharAt(routeRegex.length() - 1);

                exchange.getIn().setHeader("route-split", true);
                exchange.getIn().setHeader("route-uuid", routeIds.toString());
                exchange.getIn().setHeader("route-regex", routeRegex.toString());
                exchange.getIn().setHeader("route-recipients", routes.toString());
            } else {
                exchange.getIn().setHeader("route-split", false);
                exchange.getIn().setHeader("route-uuid", uniqueRoutes.values().iterator().next().getRouteId());
                exchange.getIn().setHeader("route-regex", uniqueRoutes.values().iterator().next().getRegularExpression());
                exchange.getIn().setHeader("route-recipients", uniqueRoutes.values().iterator().next().getDestination());
            }
        }
    }
}

Bean messageCoalitionService 只是保存消息正文和标头,以便可以复制消息并进行系统审核。

我不确定我是否做错了,我应该使用 Camel Async API 还是需要管道来实现它?这个模式看起来很接近我需要的http://camel.apache.org/async.html(异步请求回复)任何帮助将非常感谢。

【问题讨论】:

    标签: spring-boot apache-camel


    【解决方案1】:

    最后,我使用 Spring Integration 实现了上述功能。一旦 Camel Route 发送了消息,我就无法找到一种方法来检索已发送消息的消息 ID,这意味着当响应被发回时,我无法跟踪相关 ID。使用 Camel InOut 导致 Camel 阻塞并等待响应,这也不是我想要的。

    感谢 lutalex 提供的解决方案: http://forum.spring.io/forum/other-spring-related/remoting/30397-jmsmessageid-after-message-is-sent?p=745127#post745127

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-08
      • 1970-01-01
      • 2012-10-25
      • 1970-01-01
      • 2020-10-11
      • 1970-01-01
      相关资源
      最近更新 更多