【问题标题】:Spring Integration application with external Web Services monitoring具有外部 Web 服务监控的 Spring Integration 应用程序
【发布时间】:2016-10-07 18:11:22
【问题描述】:

目前我有一个带有 Spring Integration DSL 的应用程序,它具有带有不同服务激活器的 AMQP 入站网关,每个服务激活器都有一种逻辑来决定、转换和调用外部 Web 服务(目前使用 CXF),但所有这些逻辑都在没有 Spring Integration 组件的代码。

这些服务激活器受到监控,在从该应用程序返回数据的输出通道中,是一个 AMQP 适配器,它将标头发送到队列(之后,所有标头都被处理并保存在数据库中以供将来分析)。这很好用,这些服务激活器甚至在标头中已经过时间。

现在的问题是,如果发生错误,我需要监控外部 Web 服务调用,例如每个操作中经过的时间、调用了哪个服务端点和操作。

我一直在想,每个服务激活器中的逻辑代码都应该转换成一个 Spring Integration 流,在每个服务激活器中,都会调用一个新的网关,并在 header 中使用 web 服务的操作名称,并监控我现在一直在做的每一个流程。

所以,我不确定这种手动方法是否是更好的方法,所以我想知道是否有办法使用某种拦截器或类似于 CXF 或 Spring WS 的东西来获取服务操作的名称以避免以手动方式在标题中设置操作的名称?你有什么建议?

这里有更多的上下文是 Spring Integration 配置:

@Bean
public IntegrationFlow inboundFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(simpleMessageListenerContainer())
            .mappedReplyHeaders(AMQPConstants.AMQP_CUSTOM_HEADER_FIELD_NAME_MATCH_PATTERN)
            .mappedRequestHeaders(AMQPConstants.AMQP_CUSTOM_HEADER_FIELD_NAME_MATCH_PATTERN)
            .errorChannel(gatewayErrorChannel())
            .requestChannel(gatewayRequestChannel())
            .replyChannel(gatewayResponseChannel())
        )
        .enrichHeaders(new Consumer<HeaderEnricherSpec>() {
                @Override
                public void accept(HeaderEnricherSpec t) {
                    t.headerExpression(AMQPConstants.START_TIMESTAMP, "T(java.lang.System).currentTimeMillis()");
                }

         })
        .transform(getCustomFromJsonTransformer())
        .route(new HeaderValueRouter(AMQPConstants.OPERATION_ROUTING_KEY))
        .get();
}

@Bean
public MessageChannel gatewayRequestChannel() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public MessageChannel gatewayResponseChannel() {
    return MessageChannels.publishSubscribe().get();
}

private IntegrationFlow loggerOutboundFlowTemplate(MessageChannel fromMessageChannel) {
    return IntegrationFlows.from(fromMessageChannel)
        .handle(Amqp.outboundAdapter(new RabbitTemplate(getConnectionFactory()))
            .exchangeName(LOGGER_EXCHANGE_NAME)
            .routingKey(LOGGER_EXCHANGE_ROUTING_KEY)
            .mappedRequestHeaders("*"))
        .get();
}

这是一个典型的服务激活器,如您所见,所有这些逻辑都可能是一个集成流:

@ServiceActivator(inputChannel="myServiceActivator", outputChannel = ConfigurationBase.MAP_RESPONSE_CHANNEL_NAME)
public Message<Map<String, Object>> myServiceActivator(Map<String, Object> input, @Header(AMQPConstants.SESSION) UserSession session) throws MyException {
    Message<Map<String, Object>> result = null;
    Map<String, Object> mapReturn = null;

    ExternalService port = serviceConnection.getExternalService();
    try {
        if (input.containsKey(MappingConstants.TYPE)) {
            Request request = transformer
                    .transformRequest(input, session);

            Response response = port
                    .getSomething(request);

            utils.processBackendCommonErrors(response.getCode(), response.getResponse());
            mapReturn = transformer.convertToMap(response);
        } else {
            Request request = transformer
                    .transformRequest(input, session);

            Response response = port
                    .getSomethingElse(request);

            utils.processBackendCommonErrors(response.getCode(),
                    response.getResponse());
            mapReturn = transformer.convertToMap(response);
        }
    } catch (RuntimeException e) {
        String message = "unexcepted exception from the back-end";
        logger.warn(message, e);
        throw MyException.generateTechnicalException(message, null, e);
    }

    result = MessageBuilder.withPayload(mapReturn)
            .build();

    return result;
}

【问题讨论】:

    标签: java spring cxf spring-integration spring-ws


    【解决方案1】:

    到目前为止一切顺利。或者我不明白问题所在,或者你不清楚问题在哪里。

    无论如何,您始终可以使用AOP 代理任何 Spring 服务,因为看起来您正在指向代码:

    Response response = port
                    .getSomething(request);
    

    当调用此(或类似)方法时,某些MethodInterceptor 可以执行所需的跟踪逻辑并将结果发送到某些MessageChannel 以进行进一步分析或其他任何事情:

    public Object invoke(MethodInvocation invocation) throws Throwable {
        // Extract required operation name and start_date from the MethodInvocation
        Object result = invocation.proceed();
        // Extract required data from the response
        // Build message and send to the channel
        return result;
    }
    

    【讨论】:

    • 我正在记录我的标头中包含的所有内容,使用当前的服务激活器 (SA),SA 名称在标头中,因此日志包含执行的 SA,并且以同样的方式我希望在标题中包含被调用的外部服务和操作的名称。我认为可以使用新的集成流程,但是应该从第一次调用中将具有操作名称的新网关作为标头插入,所以我想知道是否有一种自动方法来获取 WS 操作的名称,得到这个无需手动设置名称操作。
    • “操作”是什么意思?它在哪里?谁控制它?所以,只需使用这些东西并将其作为标题提供到日志消息中。
    • 我的意思是,第三方 Web 服务方法或操作,如果使用 AOP、Web 服务拦截器或标头丰富器,我可以获取 Web 服务方法的名称并以某种方式将此方法名称放在标头中调用网关时无需手动将名称设置为标头
    • 再次注意您的第一个响应,使用 AOP 和 methodInterceptor 如何将我想要的信息返回到 MessageChannel?我如何将 mehotdInterceptor 包含在 Spring 集成流程中?
    • 我向您指出了有关 AOP 的文档。这与 Spring Integration 无关。您向我展示调用外部服务的代码,我向您展示如何在不改变逻辑的情况下获得灵活的解决方案。您仍然没有告诉我您的操作名称在哪里,但我相信您将能够从该 MethodInterceptor 中提取它。如果您想将其更改为 IntegrationFlow,那将是一个完全不同的问题
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-10-29
    • 1970-01-01
    相关资源
    最近更新 更多