【问题标题】:Spring Integration - JdbcPollingChannelAdapter commit instead of rollback on handled ExceptionsSpring Integration - JdbcPollingChannelAdapter 提交而不是回滚处理的异常
【发布时间】:2015-08-11 09:44:32
【问题描述】:

我将Spring 4.1.x APIsSpring Integration 4.1.x APIsSpring Integration Java DSL 1.0.x APIs 用于EIP 流,我们使用JdbcPollingChannelAdpater 作为流的入口点来使用来自Oracle 数据库表的消息。

即使我们在JdbcPollingChannelAdapterPoller 上配置了ErrorHandler,我们看到会话的Transaction 仍然回滚并且在RuntimeException 被抛出并正确处理时未提交ErrorHandler

读完这个帖子:Spring Transactions - Prevent rollback after unchecked exceptions (RuntimeException),我觉得不可能阻止回滚,而是强制提交。它是否正确?而且,如果有办法,在安全处理错误时强制提交而不是回滚的最干净的方法是什么?

当前配置:

IntegrationConfig.java:

@Bean
public MessageSource<Object> jdbcMessageSource() {

    JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(
            dataSource,
            "select * from SERVICE_TABLE where rownum <= 10 for update skip locked");
    adapter.setUpdateSql("delete from SERVICE_TABLE where SERVICE_MESSAGE_ID in (:id)");
    adapter.setRowMapper(serviceMessageRowMapper);
    adapter.setMaxRowsPerPoll(1);
    adapter.setUpdatePerRow(true);
    return adapter;
}

@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow inFlow() {

    return IntegrationFlows
            .from(jdbcMessageSource(),
                    c -> {
                        c.poller(Pollers.fixedRate(100)
                                .maxMessagesPerPoll(10)
                                .transactional(transactionManager)
                                .errorHandler(errorHandler));
                    })
                .channel(inProcessCh()).get();
}

ErrorHandler.java

@Component
public class ErrorHandler implements org.springframework.util.ErrorHandler {

    @Autowired
    private PlatformTransactionManager transactionManager;

    private static final Logger logger = LogManager.getLogger();

    @Override
    public void handleError(Throwable t) {

        logger.trace("handling error:{}", t.getMessage(), t);

        // handle error code here...

        // we want to force commit the transaction here?
        TransactionStatus txStatus = transactionManager.getTransaction(null);
        transactionManager.commit(txStatus);
    }
}

--- 已编辑以包含 ExpressionEvaluatingRequestHandlerAdvice Bean ---

@Bean
public Advice expressionEvaluatingRequestHandlerAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice expressionEvaluatingRequestHandlerAdvice = new ExpressionEvaluatingRequestHandlerAdvice();
    expressionEvaluatingRequestHandlerAdvice.setTrapException(true);
    expressionEvaluatingRequestHandlerAdvice.setOnSuccessExpression("payload");
    expressionEvaluatingRequestHandlerAdvice
            .setOnFailureExpression("payload");
    expressionEvaluatingRequestHandlerAdvice.setFailureChannel(errorCh());
    return expressionEvaluatingRequestHandlerAdvice;
}

--- 已编辑以显示虚拟测试消息处理程序 ---

    .handle(Message.class,
                    (m, h) -> {

                        boolean forceTestError = m.getHeaders().get("forceTestError");
                        if (forceTestError) {
                            logger.trace("simulated forced TestException");
                            TestException testException = new TestException(
                                    "forced test exception");
                            throw testException;
                        }

                        logger.trace("simulated successful process");

                        return m;
                    }, e-> e.advice(expressionEvaluatingRequestHandlerAdvice())

--- 已编辑以显示 ExecutorChannelInterceptor 方法 ---

@Bean
public IntegrationFlow inFlow() {

    return IntegrationFlows
            .from(jdbcMessageSource(), c -> {
                c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(10)
                        .transactional(transactionManager));
            })
            .enrichHeaders(h -> h.header("errorChannel", errorCh(), true))
            .channel(
                    MessageChannels.executor("testSyncTaskExecutor",
                            syncTaskExecutor()).interceptor(
                            testExecutorChannelInterceptor()))
            .handle(Message.class, (m, h) -> {
                    boolean forceTestError = m.getHeaders().get("forceTestError");
                    if (forceTestError) {
                        logger.trace("simulated forced TestException");
                        TestException testException = new TestException(
                                "forced test exception");
                        throw testException;
                    }

                    logger.trace("simulated successful process");
            }).channel("nullChannel").get();
}

【问题讨论】:

    标签: spring spring-integration spring-jdbc spring-java-config


    【解决方案1】:

    它不会因为你的ErrorHandler 在 TX 完成后已经工作了。

    这里有几行源代码 (AbstractPollingEndpoint.Poller):

    @Override
    public void run() {
        taskExecutor.execute(new Runnable() {
            @Override
            public void run() {
    
               .............
                    try {
                            if (!pollingTask.call()) {
                                break;
                            }
                            count++;
                        }
                        catch (Exception e) {
                ....
                        }
                    }
                }
            });
        }
    

    地点:

    1. ErrorHandler 默认应用于taskExecutor (SyncTaskExecutor)。

    2. TransactionInterceptorAspect 被应用于 pollingTask 周围的代理。

    因此 TX 在 pollingTask.call() 周围完成并熄灭。只有在那之后,您的 ErrorHandler 才开始在 taskExecutor.execute() 内部工作。

    要解决您的问题,您需要确定哪个下游流程部分对 TX 回调不是那么重要,并在那里设置一些 try...catch 或使用 ExpressionEvaluatingRequestHandlerAdvice 来“消除”RuntimeException

    但正如你所注意到的,我的推理必须在 TX 内完成。

    【讨论】:

    • 我们确实使用try...catch 处理错误,但是我们接下来要做的(对于我们所有的流程),将错误包装到特定于业务的RuntimeException 中,该RuntimeException 会被退回并路由到业务-具体errorChannel。我创建了ExpressionEvaluatingRequestHandlerAdvice(参见主要问题中的bean)。为了测试,我创建了一个带有简单服务的虚拟流程,其中包含上述建议,但现在,成功的流程或错误流程测试都没有显示 tx 提交正在发生。是否有一个 spring-integration-java-dsl 示例显示此建议在某处使用?
    • 实际上,看起来 Advice 配置正确,并且在成功和错误过程测试期间正在提交事务。 (抱歉,Eclipse IDE 没有显示测试已在幕后断点处暂停,因此提交未完成)
    • 还有一个问题,在我的测试中,我用 Advice 包装了一个特定的消息处理程序(服务)。有没有办法在全局范围内设置这个 Advice,以便它拦截 Spring Context 流中的所有消息处理程序,或者至少跨给定同步事务流中的所有消息处理程序?
    • 是的,有可能。通过标准 Spring AOP 框架,bean 名称为 MessageHandlers 或只是 MessageHandler.handleMessage() 切入点模式。
    • 好的。我看我是不对的。 ExecutorSubscribableChannel 在执行triggerAfterMessageHandled 后有一个代码重新抛出异常。因此,内部的任何操作都无济于事。我只看到try...catch 目标服务以防止回滚或其他任何事情的方式......否则你应该等待jira.spring.io/browse/INT-3770 或依赖ExpressionEvaluatingRequestHandlerAdvice 为每个&lt;service-activator&gt;
    猜你喜欢
    • 2019-03-11
    • 1970-01-01
    • 2021-04-12
    • 1970-01-01
    • 2021-12-10
    • 2012-07-10
    • 2014-03-03
    • 2017-11-15
    • 1970-01-01
    相关资源
    最近更新 更多