【发布时间】:2017-11-16 11:19:48
【问题描述】:
我正在尝试实现从远程系统队列 (System.A.out) 中读取 request 消息的 Camel Route 该路由查看消息正文并将其动态路由到其他系统queue (System.B.in) 然后这条 Route 完成,并等待来自队列的下一条消息(当前它阻塞并等待临时队列上的响应)
System.B 读取其输入队列(System.B.in,并不总是骆驼路由)处理消息并在其输出队列(System.B.out)系统上丢弃 响应。 B 使用 Request 消息中的 JMSMessageID 作为其 response 中的 JMSCorrelationID,这就是它从请求中保留的全部内容。
骆驼路由(类似于 System.A.out,但在 System.B.out 上侦听)获取响应消息并使用 JMSCorrelationID(请求将没有 JMSCorrelationID,因此将通过消息路由body) 找到请求的 JMSReplyTo 队列 (System.A.in) 并将 System.A 上的响应丢弃在队列中以供 System.A 处理。
我使用的是SpringBoot和Camel 2.18.3,消息队列是IMB MQ版本8
我的路线如下所示:
@Override
public void configure() throws Exception {
//@formatter:off
Predicate validRoute = header("route-valid").isEqualTo(true);
Predicate inValidRoute = header("route-valid").isEqualTo(false);
Predicate splitRoute = header("route-split").isEqualTo(true);
Predicate singleRoute = header("route-split").isEqualTo(false);
Predicate validSplitRoute = PredicateBuilder.and(validRoute, splitRoute);
Predicate validSingelRoute = PredicateBuilder.and(validRoute, singleRoute);
from(endpoint(incomingURI)).routeId(routeId)
.process(exchange -> {
exchange.getIn().setHeader("route-source", format("%s-%s", incomingURI, routeId));
})
.to(endpoint(format("bean:evaluateIncomingMessageService?method=routeMessage(*, %s)", replyToURI)))
.choice()
.when(validSingelRoute)
.log(DEBUG, "Creating a Single route")
.to(endpoint("bean:messageCoalitionService?method=saveInstruction(*)"))
.setExchangePattern(ExchangePattern.InOut)
.toD("${header.route-recipients}")
.when(inValidRoute)
.log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
.to(endpoint(deadLetterURI))
.routeId(format("%s-%s", incomingURI, routeId))
.when(validSplitRoute)
.log(DEBUG, "Creating a Split route")
.to(endpoint("bean:messageCoalitionService?method=saveInstructions(*)"))
.setExchangePattern(ExchangePattern.InOut)
.multicast()
.toD("${header.route-recipients}").endChoice()
.otherwise()
.log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
.to(endpoint(deadLetterURI))
.routeId(format("%s-%s", incomingURI, routeId));
Spring Bean evaluateIncomingMessageService 决定消息是Request(无关联ID)还是Response,并为请求设置路由标头。我希望 Camel 会自动将响应路由到 Request.JMSReplyTo 队列,如果不是,如何做到这一点?
replyToURI 在 Camel Route 构建器中配置,如果路由在 System.A.out 上侦听,则它的 replyToURI 将始终为 System.A.in。
evaluateIncomingMessageService.routeMessage 看起来像这样:
public void routeMessage(final Exchange exchange, final String replyToURI) {
String correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);
if (correlationId != null) {
log.debug("Processing Message Response with JMSCorrelationID [{}]", correlationId);
exchange.getIn().setHeader("JMSReplyTo", replyToURI);
} else {
// Request Messages have nave NO correlationId
log.debug("Processing Message Request with MessageID [{}] and JMSMessageID: [{}]",
exchange.getIn().getMessageId(),
exchange.getIn().getHeader("JMSMessageID") != null ? exchange.getIn().getHeader("JMSMessageID").toString() : exchange.getIn().getMessageId());
String message = exchange.getIn().getBody(String.class);
Set<ContentBasedRoute> validRoutes = contentBasedRouting
.stream().filter(
routeEntity -> Pattern.compile(
routeEntity.getRegularExpression(), DOTALL).matcher(message).matches()).collect(Collectors.toSet());
if (validRoutes.isEmpty()) {
log.warn("No valid routes found for message: [{}] ", message);
exchange.getIn().setHeader("route-valid", false);
} else {
HashMap<String, ContentBasedRoute> uniqueRoutes = new HashMap<>();
validRoutes.stream().forEach(route -> uniqueRoutes.putIfAbsent(route.getDestination(), route));
exchange.getIn().setHeader("route-valid", true);
exchange.getIn().setHeader("route-count", uniqueRoutes.size());
exchange.getIn().setHeader("JMSReplyTo", replyToURI);
//if (exchange.getIn().getHeader("JMSMessageID") == null) {
// exchange.getIn().setHeader("JMSMessageID", exchange.getIn().getMessageId());
//}
if (uniqueRoutes.size() > 1) {
log.debug("Building a split route");
StringBuilder routes = new StringBuilder();
StringBuilder routeIds = new StringBuilder();
StringBuilder routeRegex = new StringBuilder();
uniqueRoutes.keySet().stream().forEach(i -> routes.append(i).append(","));
uniqueRoutes.values().stream().forEach(j -> routeIds.append(j.getRouteId()).append(","));
uniqueRoutes.values().stream().forEach(k -> routeRegex.append(k.getRegularExpression()).append(","));
routes.deleteCharAt(routes.length() - 1);
routeIds.deleteCharAt(routeIds.length() - 1);
routeRegex.deleteCharAt(routeRegex.length() - 1);
exchange.getIn().setHeader("route-split", true);
exchange.getIn().setHeader("route-uuid", routeIds.toString());
exchange.getIn().setHeader("route-regex", routeRegex.toString());
exchange.getIn().setHeader("route-recipients", routes.toString());
} else {
exchange.getIn().setHeader("route-split", false);
exchange.getIn().setHeader("route-uuid", uniqueRoutes.values().iterator().next().getRouteId());
exchange.getIn().setHeader("route-regex", uniqueRoutes.values().iterator().next().getRegularExpression());
exchange.getIn().setHeader("route-recipients", uniqueRoutes.values().iterator().next().getDestination());
}
}
}
}
Bean messageCoalitionService 只是保存消息正文和标头,以便可以复制消息并进行系统审核。
我不确定我是否做错了,我应该使用 Camel Async API 还是需要管道来实现它?这个模式看起来很接近我需要的http://camel.apache.org/async.html(异步请求回复)任何帮助将非常感谢。
【问题讨论】: