【问题标题】:no rollback when using Spring, JOOQ, Postgres, and ActiveMQ使用 Spring、JOOQ、Postgres 和 ActiveMQ 时不回滚
【发布时间】:2015-09-26 12:16:11
【问题描述】:

数据库写入没有像我预期的那样回滚。 我花了很多时间阅读软件文档和网络帖子。 我无法解决问题。

我希望你们能帮助我。

场景

  • 我的应用程序从队列中提取消息,从队列中提取数据
    消息,并将其写入数据库。
  • 写入数据库的方法执行 2 个 SQL 插入。
  • 第二次插入出现异常:org.postgresql.util.PSQLException:错误:重复键值违反唯一约束“table2_PK”
  • 但是,第一个插入仍在提交到数据库中。

相关软件

  1. spring-boot 1.2.5.RELEASE
  2. atomikos-util 3.9.3(来自 spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
  3. jooq 3.6.2
  4. postgresql 9.4-1201-jdbc41
  5. activemq-client 5.1.2

应用程序代码 - 我已将代码的相关部分粘贴在下面。

  1. GdmServer - 我的“服务器”类,它也声明了 Spring bean 配置
  2. PortSIQueue - 我的 JMS MessageListener 类
  3. 内核 - 我的工作类,即写入数据库的代码,由我的 MessageListener 调用的 Spring bean

如果有人能提供任何帮助,我将不胜感激。

谢谢


package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisConfig                       gisConfig;

    /**
     * Starts the GDM Server
     */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }

    // -------------------------------------------------------------------------
    // Spring bean configurations
    // -------------------------------------------------------------------------

    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }

    @Bean
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction   ( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        return manager;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }

    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );

        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        return xaDataSource;
    }

    @Bean
    DSLContext dslContext() {
        DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES);
        return dslContext;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );

        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
        atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }

    @Bean
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
        DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
        messageSource.setSessionTransacted(true);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        return messageSource;
    }

    @Bean
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }

}

package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisMessageMarshaler             queueMessageMashaler;
    @Autowired
    Kernel                          kernel;

    @Override
    @Transactional(rollbackFor = {Throwable.class})
    public void onMessage(Message jmsMessage) {

        TextMessage jmsTextMessage = (TextMessage) jmsMessage;

        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);

        kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );
    }

}

package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;
import org.jooq.impl.DSL;

@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

            dslContext.insertInto(table1)
                .set(...)
                .set(...)
            .execute();

            dslContext.insertInto(table2)
                .set(...)
                .set(...)
            .execute();
    }
<snip>
}

【问题讨论】:

  • 你为什么努力不使用 Spring Boot? Spring Boot 1.2 具有开箱即用的 Atomikos 支持和自动检测功能。您正在努力不使用和规避 Spring Boot 与不针对它的框架一起工作。
  • 我没有试图绕过 Sprint Boot。我相信我已经按照 Spring 文档和 Atomkos、JOOQ 和 ActiveMQ 网站上的相关材料中描述的使用 Spring Boot 1.2 和 Atomikos 的说明进行操作。也就是说,我不是想给任何人或我制造麻烦。 :) 也许我误解了一些东西。这就是我发布问题的原因。
  • 我怀疑您的异常翻译器可能是错误的。请你也发一下好吗?

标签: spring postgresql activemq jooq atomikos


【解决方案1】:

我是个白痴。 原来问题是由于我的应用程序逻辑中的缺陷造成的。 如果第一次尝试处理消息失败并出现异常,ActiveMQ 组件会重试消息。为第一次尝试创建的事务正确回滚。这是第二次成功的尝试。重试成功,因为在第一次尝试期间应用程序逻辑增加了数据库序列号,而第二次尝试未导致重复键违规。更正应用程序逻辑缺陷后,由于在我的应用程序中无论如何都没有消息可重试,所以我也关闭了重试。 对于浪费阅读我帖子的人的时间,我深表歉意。

在此过程中,我确实对实现进行了一些更改。这些更改使某些默认值显式选择。我保留了这些更改,因为我相信它们将使我团队中的其他开发人员更容易更快地了解正在发生的事情。我还保留了 JOOQ 异常翻译代码,因为在其他情况下需要它并且无论如何似乎是最佳实践。

我在这篇文章中包含了修改后的代码,以防其他人发现它有用。


package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.jooq.impl.DefaultExecuteListenerProvider;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement(proxyTargetClass=true)
public class GdmServer {

    @Autowired
    ConfigurableApplicationContext   context;
    @Autowired
    GisConfig                        gisConfig;

    /**
     * Starts the GDM Server
     */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }

    // -------------------------------------------------------------------------
    // Spring bean configurations
    // -------------------------------------------------------------------------

    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }

    @Bean
    @DependsOn({ "atomikosUserTransactionManager", "atomikosUserTransaction", "atomikosJdbcConnectionFactory", "atomikosJmsConnectionFactory" })
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        manager.afterPropertiesSet();
        return manager;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }

    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() throws Exception {
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );

        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        xaDataSource.setTestQuery("SELECT 1");
        xaDataSource.afterPropertiesSet();
        return xaDataSource;
    }

    @Bean
    @DependsOn({ "atomikosJdbcConnectionFactory" })
    DSLContext dslContext() throws Exception {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
        jooqConfiguration.set( SQLDialect.POSTGRES_9_4 );
        jooqConfiguration.set( atomikosJdbcConnectionFactory() );
        jooqConfiguration.set( new DefaultExecuteListenerProvider(new JooqToSpringExceptionTransformer()) );
        DSLContext dslContext = new DefaultDSLContext(jooqConfiguration);
        return dslContext;
    }


    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0);
        redeliveryPolicy.setRedeliveryDelay(0);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(0);

        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );
        activeMQXAConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
        atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }

    @Bean
    @DependsOn({ "transactionManager" })
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
        DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
        messageSource.setSessionTransacted(true);
        messageSource.setSessionAcknowledgeMode(0);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        messageSource.afterPropertiesSet();
        return messageSource;
    }

    @Bean
    @DependsOn({ "transactionManager" })
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setSessionAcknowledgeMode(0);
        return jmsTemplate;
    }

}

package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {

    @Autowired
    ConfigurableApplicationContext    context;
    @Autowired
    GisMessageMarshaler               queueMessageMashaler;
    @Autowired
    Kernel                            kernel;

    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Throwable.class})
    public void onMessage(Message jmsMessage) {

        TextMessage jmsTextMessage = (TextMessage) jmsMessage;

        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);

        kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );

}

package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;

@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

        dslContext.insertInto(table1)
            .set(...)
            .set(...)
        .execute();

        dslContext.insertInto(table2)
            .set(...)
            .set(...)
        .execute();
    }
<snip>
}

【讨论】:

    【解决方案2】:

    使用 Transactional Annotation 时遇到类似问题。必须在 try/catch 中使用 (begin..commit)/rollback 显式处理事务。不是很优雅和重复但有效。 TransactionContext 保存在当前线程中。所以你的 begin 方法不需要返回 ctx 对象。 TransactionContext 可以使用您的 DSLContext.configuration() 实例化。

    公共类 DataSourceTransactionProvider 实现 TransactionProvider { 私有最终 DataSourceTransactionManager txMgr;

    @Inject
    public DataSourceTransactionProvider(DataSourceTransactionManager transactionManager) {
        this.txMgr = transactionManager;
    }
    
    @Override
    public void begin(TransactionContext ctx) throws DataAccessException {
        TransactionStatus transactionStatus = txMgr.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_NESTED));
        ctx.transaction(new DBTransaction(transactionStatus));
    }
    
    @Override
    public void commit(TransactionContext ctx) throws DataAccessException {
        txMgr.commit(((DBTransaction) ctx.transaction()).transactionStatus);
    }
    
    @Override
    public void rollback(TransactionContext ctx) throws DataAccessException {
        txMgr.rollback(((DBTransaction) ctx.transaction()).transactionStatus);
    }
    

    }

    【讨论】:

      猜你喜欢
      • 2019-07-09
      • 1970-01-01
      • 2013-02-16
      • 2018-03-27
      • 1970-01-01
      • 2017-08-19
      • 2020-01-21
      • 2014-11-05
      • 2011-05-27
      相关资源
      最近更新 更多