【问题标题】:Error handling on an outbound gateway in async flow异步流中出站网关上的错误处理
【发布时间】:2018-01-02 12:52:02
【问题描述】:

我有一个这样的集成流程:

@Bean
public IntegrationFlow inboundRequestFlow()
{
    return IntegrationFlows.from( inboundRequestGateway() )
        .log( ... )
        .filter(  requestValidator,
            spec -> spec.discardChannel( invalidRequestChannel() ) )
        .bridge( spec -> spec.requiresReply( false ) )
        .channel( asyncFlowChannel )
        .wireTap(
            //sends a NO_CONTENT reply if the request is ok
            flow -> flow.enrichHeaders( spec -> spec.header( HttpHeaders.STATUS_CODE, HttpStatus.NO_CONTENT ).defaultOverwrite( true ) )
                .transform( payload -> "" )
                .channel( inboundGatewayReplyChannel() )
        ).get();

}

它在 http 网关上接收请求,对其进行验证,如果一切正常,则将请求发送到“asyncFlowChannel”并以 204 回复入站网关。

'asyncFlowChannel' 是另一个在 Executor 通道上运行的 IntegrationFlow 的起点:

@Bean
public IntegrationFlow outboundFlow()
{
    return IntegrationFlows.from( asyncFlowChannel)
        .log( ... )
        .transform( ... )
        .transform( ... )
        .split(... )            
        .resequence( ... )
        .enrichHeaders( ... )
        .log( ... )
        .transform( ... )
        .handle( this.outboundSOAPGateway() )
        .log( .. )
        .handle( ... )
        .bridge( spec -> spec.requiresReply( false ) )
        .channel( anotherAsyncFlowChannel )
        .get();
}

如果我的 outboundGateway 发生异常(由于网络相关的 IO 错误或错误响应),我想记录错误并采取适当的措施。但我无法在 outboundSOAPGateway 上设置错误通道,并且起始流程上的 inboundRequestGateway 已经收到它的回复。

我得到的错误的唯一线索是这个日志:

10:19:53.002 WARN [outbound-flow-0] org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel - 收到回复消息但接收线程已经收到回复:ErrorMessage [payload=..., headers =...]

我的问题是:在异步流中处理出站网关上的错误的正确方法是什么,其中启动流的 inboundGateway 已经收到它的回复?

【问题讨论】:

    标签: java spring-integration


    【解决方案1】:

    任何MessageHandler 端点都可以通过AbstractRequestHandlerAdvice 提供。其中之一是ExpressionEvaluatingRequestHandlerAdvice,您可以在其中捕获异常并将其发送到failureChannelhttps://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice

    为此,.handle( this.outboundSOAPGateway() ) 可以提供第二个参数,例如:

    .handle((GenericHandler<?>) (p, h) -> {
                        throw new RuntimeException("intentional");
                    }, e -> e.advice(retryAdvice()))
    

    在这种情况下,我使用

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
        return requestHandlerRetryAdvice;
    }
    

    但同样适用于ExpressionEvaluatingRequestHandlerAdvice

    顺便说一句,retryAdvice() 也可以为您解决问题。查看其ErrorMessageSendingRecoverer

    【讨论】:

    • 我在我的建议链中添加了新的 ErrorMessageSendingRecoverer(recoveryChannel()) 并且它起作用了......谢谢!
    【解决方案2】:

    .channel( asyncFlowChannel )代替

    .gateway(asyncFlowChannel, e -> e.errorChannel(...))
    

    【讨论】:

    • 另见我的回答。
    • 我尝试了这种方法并且错误通道被正确调用,但是入站网关不再异步回复...我也尝试了 .gateway( labelRequestInputChannel, spec -> spec.errorChannel( failedLabelRequestRecoveryChannel ).requiresReply ( false ) ) 但它仍然等待流程结束才能回复
    • 啊,是的;抱歉,网关必须在执行器通道之后。不过Artem的建议也不错。
    猜你喜欢
    • 1970-01-01
    • 2016-11-22
    • 1970-01-01
    • 1970-01-01
    • 2013-01-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多