【发布时间】:2016-10-07 18:11:22
【问题描述】:
目前我有一个带有 Spring Integration DSL 的应用程序,它具有带有不同服务激活器的 AMQP 入站网关,每个服务激活器都有一种逻辑来决定、转换和调用外部 Web 服务(目前使用 CXF),但所有这些逻辑都在没有 Spring Integration 组件的代码。
这些服务激活器受到监控,在从该应用程序返回数据的输出通道中,是一个 AMQP 适配器,它将标头发送到队列(之后,所有标头都被处理并保存在数据库中以供将来分析)。这很好用,这些服务激活器甚至在标头中已经过时间。
现在的问题是,如果发生错误,我需要监控外部 Web 服务调用,例如每个操作中经过的时间、调用了哪个服务端点和操作。
我一直在想,每个服务激活器中的逻辑代码都应该转换成一个 Spring Integration 流,在每个服务激活器中,都会调用一个新的网关,并在 header 中使用 web 服务的操作名称,并监控我现在一直在做的每一个流程。
所以,我不确定这种手动方法是否是更好的方法,所以我想知道是否有办法使用某种拦截器或类似于 CXF 或 Spring WS 的东西来获取服务操作的名称以避免以手动方式在标题中设置操作的名称?你有什么建议?
这里有更多的上下文是 Spring Integration 配置:
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlows.from(Amqp.inboundGateway(simpleMessageListenerContainer())
.mappedReplyHeaders(AMQPConstants.AMQP_CUSTOM_HEADER_FIELD_NAME_MATCH_PATTERN)
.mappedRequestHeaders(AMQPConstants.AMQP_CUSTOM_HEADER_FIELD_NAME_MATCH_PATTERN)
.errorChannel(gatewayErrorChannel())
.requestChannel(gatewayRequestChannel())
.replyChannel(gatewayResponseChannel())
)
.enrichHeaders(new Consumer<HeaderEnricherSpec>() {
@Override
public void accept(HeaderEnricherSpec t) {
t.headerExpression(AMQPConstants.START_TIMESTAMP, "T(java.lang.System).currentTimeMillis()");
}
})
.transform(getCustomFromJsonTransformer())
.route(new HeaderValueRouter(AMQPConstants.OPERATION_ROUTING_KEY))
.get();
}
@Bean
public MessageChannel gatewayRequestChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public MessageChannel gatewayResponseChannel() {
return MessageChannels.publishSubscribe().get();
}
private IntegrationFlow loggerOutboundFlowTemplate(MessageChannel fromMessageChannel) {
return IntegrationFlows.from(fromMessageChannel)
.handle(Amqp.outboundAdapter(new RabbitTemplate(getConnectionFactory()))
.exchangeName(LOGGER_EXCHANGE_NAME)
.routingKey(LOGGER_EXCHANGE_ROUTING_KEY)
.mappedRequestHeaders("*"))
.get();
}
这是一个典型的服务激活器,如您所见,所有这些逻辑都可能是一个集成流:
@ServiceActivator(inputChannel="myServiceActivator", outputChannel = ConfigurationBase.MAP_RESPONSE_CHANNEL_NAME)
public Message<Map<String, Object>> myServiceActivator(Map<String, Object> input, @Header(AMQPConstants.SESSION) UserSession session) throws MyException {
Message<Map<String, Object>> result = null;
Map<String, Object> mapReturn = null;
ExternalService port = serviceConnection.getExternalService();
try {
if (input.containsKey(MappingConstants.TYPE)) {
Request request = transformer
.transformRequest(input, session);
Response response = port
.getSomething(request);
utils.processBackendCommonErrors(response.getCode(), response.getResponse());
mapReturn = transformer.convertToMap(response);
} else {
Request request = transformer
.transformRequest(input, session);
Response response = port
.getSomethingElse(request);
utils.processBackendCommonErrors(response.getCode(),
response.getResponse());
mapReturn = transformer.convertToMap(response);
}
} catch (RuntimeException e) {
String message = "unexcepted exception from the back-end";
logger.warn(message, e);
throw MyException.generateTechnicalException(message, null, e);
}
result = MessageBuilder.withPayload(mapReturn)
.build();
return result;
}
【问题讨论】:
标签: java spring cxf spring-integration spring-ws