【发布时间】:2015-09-26 12:16:11
【问题描述】:
数据库写入没有像我预期的那样回滚。 我花了很多时间阅读软件文档和网络帖子。 我无法解决问题。
我希望你们能帮助我。
场景
- 我的应用程序从队列中提取消息,从队列中提取数据
消息,并将其写入数据库。 - 写入数据库的方法执行 2 个 SQL 插入。
- 第二次插入出现异常:org.postgresql.util.PSQLException:错误:重复键值违反唯一约束“table2_PK”
- 但是,第一个插入仍在提交到数据库中。
相关软件
- spring-boot 1.2.5.RELEASE
- atomikos-util 3.9.3(来自 spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
- jooq 3.6.2
- postgresql 9.4-1201-jdbc41
- activemq-client 5.1.2
应用程序代码 - 我已将代码的相关部分粘贴在下面。
- GdmServer - 我的“服务器”类,它也声明了 Spring bean 配置
- PortSIQueue - 我的 JMS MessageListener 类
- 内核 - 我的工作类,即写入数据库的代码,由我的 MessageListener 调用的 Spring bean
如果有人能提供任何帮助,我将不胜感激。
谢谢
package com.sm.gis.gdm;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;
@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisConfig gisConfig;
/**
* Starts the GDM Server
*/
public static void main(String[] args) {
SpringApplication.run(GdmServer.class, args);
}
// -------------------------------------------------------------------------
// Spring bean configurations
// -------------------------------------------------------------------------
@Bean
GisConfig gisConfig() {
return new GisConfig();
}
@Bean
PlatformTransactionManager transactionManager() throws SystemException {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager( atomikosUserTransactionManager() );
manager.setUserTransaction ( atomikosUserTransaction() );
manager.setAllowCustomIsolationLevels(true);
return manager;
}
@Bean(initMethod = "init", destroyMethod = "close")
UserTransactionManager atomikosUserTransactionManager() throws SystemException {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
return manager;
}
@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
PGXADataSource pgXADataSource = new PGXADataSource();
pgXADataSource.setUrl( gisConfig.getGdbUrl() );
pgXADataSource.setUser( gisConfig.getGdbUser() );
pgXADataSource.setPassword( gisConfig.getGdbPassword() );
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(pgXADataSource);
xaDataSource.setUniqueResourceName("gdb");
xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
return xaDataSource;
}
@Bean
DSLContext dslContext() {
DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES);
return dslContext;
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
return atomikosConnectionFactoryBean;
}
@Bean
DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
messageSource.setTransactionManager( transactionManager() );
messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
messageSource.setSessionTransacted(true);
messageSource.setConcurrentConsumers(1);
messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
messageSource.setMessageListener( context.getBean("portSIQueue") );
return messageSource;
}
@Bean
JmsTemplate queueWrapperLIMS() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
}
package com.sm.gis.gdm.ports;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;
@Component
public class PortSIQueue implements MessageListener {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisMessageMarshaler queueMessageMashaler;
@Autowired
Kernel kernel;
@Override
@Transactional(rollbackFor = {Throwable.class})
public void onMessage(Message jmsMessage) {
TextMessage jmsTextMessage = (TextMessage) jmsMessage;
// Extract JMS message body...
String jmsPayload = "";
try {
jmsPayload = jmsTextMessage.getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}
// Marshal XML text to object...
Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );
}
}
package com.sm.gis.gdm.kernel;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
@Component
public class Kernel {
@Autowired
ConfigurableApplicationContext context;
@Autowired
DSLContext dslContext;
<snip>
public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {
dslContext.insertInto(table1)
.set(...)
.set(...)
.execute();
dslContext.insertInto(table2)
.set(...)
.set(...)
.execute();
}
<snip>
}
【问题讨论】:
-
你为什么努力不使用 Spring Boot? Spring Boot 1.2 具有开箱即用的 Atomikos 支持和自动检测功能。您正在努力不使用和规避 Spring Boot 与不针对它的框架一起工作。
-
我没有试图绕过 Sprint Boot。我相信我已经按照 Spring 文档和 Atomkos、JOOQ 和 ActiveMQ 网站上的相关材料中描述的使用 Spring Boot 1.2 和 Atomikos 的说明进行操作。也就是说,我不是想给任何人或我制造麻烦。 :) 也许我误解了一些东西。这就是我发布问题的原因。
-
我怀疑您的异常翻译器可能是错误的。请你也发一下好吗?
标签: spring postgresql activemq jooq atomikos