【问题标题】:Bitronix - JMS and JDBC - Message is dequeued on ExceptionBitronix - JMS 和 JDBC - 消息因异常而出队
【发布时间】:2018-11-26 11:01:14
【问题描述】:

我正在尝试将 Bitronix 事务管理器集成到我的 Spring 启动项目中,以一起管理 jdbc 和 jms 事务。我有两个数据库和一个用于 jms 的 ActiveMQ 代理。我已经在同一个事务中连接了数据库,但是当我尝试包含 JMS 时,它似乎不起作用。

这是我的 Bitronix 事务管理器配置:

@Configuration
@EnableTransactionManagement
public class BitronixJtaConfiguration {

    private static final Logger log = LoggerFactory.getLogger(BitronixJtaConfiguration.class);

    @Value("${bitronix.tm.serverId}")
    private String serverId;

    @Value("${bitronix.tm.journal.disk.logPart1Filename:}")
    private String logPart1Filename;

    @Value("${bitronix.tm.journal.disk.logPart2Filename:}")
    private String logPart2Filename;

    @Bean
    public bitronix.tm.Configuration transactionManagerServices() {
        bitronix.tm.Configuration configuration = TransactionManagerServices.getConfiguration();
        configuration.setServerId(serverId);
        if ("".equals(logPart1Filename) && "".equals(logPart2Filename)) {
            configuration.setJournal(null);
            log.info("Disable journal for testing.");
        } else {
            configuration.setLogPart1Filename(logPart1Filename);
            configuration.setLogPart2Filename(logPart2Filename);
        }
        return configuration;
    }

    @Bean
    public TransactionManager transactionManager() {
        return TransactionManagerServices.getTransactionManager();
    }

    @Bean
    public UserTransaction userTransaction() {
        return TransactionManagerServices.getTransactionManager();
    }

    @Bean
    public PlatformTransactionManager platformTransactionManager() {
        UserTransaction userTransaction = userTransaction();
        TransactionManager transactionManager = transactionManager();
        return new JtaTransactionManager(userTransaction, transactionManager);
    }
}

这是我的数据库配置类之一:

@Configuration
@EnableTransactionManagement
public class TransportationPlanDBConfig {

  private static final Logger LOGGER = LoggerFactory.getLogger("ppalfile");

  @Value("${tp.jdbc.driverClassName}")
  private String driverClassName;

  @Value("${tp.jdbc.username}")
  private String username;

  @Value("${tp.jdbc.url}")
  private String url;

  @Value("${tp.jdbc.password}")
  private String password;

  @Value("${tp.c3p0.max_size}")
  private int c3p0MaxSize;

  @Value("${tp.c3p0.min_size}")
  private int c3p0MinSize;

  @Value("${tp.c3p0.unreturned_connection_timeout}")
  private int c3p0UnreturnedConnectionTimeout;

  @Value("${tp.c3p0.acquire_increment}")
  private int c3p0AcquireIncrement;

  @Value("${tp.c3p0.max_idle_time}")
  private int c3p0MaxIdleTime;

  public TransportationPlanDBConfig() {
    // Empty constructor
  }

  @Bean(name = "tpds", destroyMethod = "close")
  public DataSource dataSource() {
    LOGGER.debug("Creating Transportation plan DS");
    PoolingDataSource poolingDataSource = new PoolingDataSource();
    poolingDataSource.setClassName(driverClassName);
    poolingDataSource.setUniqueName("tpds");
    Properties props = new Properties();
    props.put("url", url);
    props.put("user", username);
    props.put("password", password);
    poolingDataSource.setDriverProperties(props);
    poolingDataSource.setAllowLocalTransactions(true);
    poolingDataSource.setMaxPoolSize(c3p0MaxSize);
    poolingDataSource.init();
    return poolingDataSource;
  }

  @Bean(name = "tpJdbcTemplate")
  JdbcTemplate jdbcTemplate(@Qualifier("tpds") DataSource dataSource) {
    LOGGER.debug("Creating JdbcTemplate transport plan");
    JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
    LOGGER.debug(" JdbcTemplate Transport Plan created ");
    return jdbcTemplate;
  }

}

我的 ActiveMQ 配置类:

@Configuration
@EnableTransactionManagement
public class ActivesMQsConfiguration {

    @Bean
    public ConnectionFactory jmsConnectionFactoryLocal() {
        PoolingConnectionFactory btmPoolingConnectionFactory = new PoolingConnectionFactory();
        btmPoolingConnectionFactory.setClassName("org.apache.activemq.ActiveMQXAConnectionFactory");
        btmPoolingConnectionFactory.setUniqueName("AMQLocal");
        btmPoolingConnectionFactory.setMinPoolSize(1);
        btmPoolingConnectionFactory.setMaxPoolSize(5);
        btmPoolingConnectionFactory.setAllowLocalTransactions(true);
        btmPoolingConnectionFactory.setUser("admin");
        btmPoolingConnectionFactory.setPassword("admin");
        btmPoolingConnectionFactory.getDriverProperties().setProperty("brokerURL", "tcp://localhost:61616");
        btmPoolingConnectionFactory.init();
        return btmPoolingConnectionFactory;
    }



    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactoryLocal(
            @Qualifier("jmsConnectionFactoryLocal") ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(true);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

我的 JMS 监听器实现:

@Component
@Transactional
public class ContactTransactionReceiver {


    private int mensajesConsumer2 = 0;

    @Autowired
    @Qualifier("versionJdbcTemplate")
    private JdbcTemplate versionJdbcTemplate;

    @Autowired
    @Qualifier("tpJdbcTemplate")
    private JdbcTemplate tpjdbcTemplate;

    @Autowired
    private VersionsConfDao versionsConfDao;

    @Autowired
    private TrainDao trainDao;


    @Transactional(rollbackFor=Exception.class)
    @JmsListener(destination = "Consumer.consumer2.VirtualTopic.TopicPrueba")
    public void receiveMessageFromContacts2(Message message) throws Exception {
        mensajesConsumer2++;
        TextMessage txtMessage = (TextMessage) message;
        System.out.println("Segundo consumer:" + txtMessage.getText() + " recibidos:" + mensajesConsumer2);


        VersionsConf versionsconf = new VersionsConf("V" + mensajesConsumer2, "V" + mensajesConsumer2, false,new Timestamp(1L), 1);
        VersionsConf versionsResult = versionsConfDao.insertUpdate(versionJdbcTemplate, versionsconf);

        if (mensajesConsumer2 == 2) {
            throw new Exception();
        }

        Train train = new Train("101"+mensajesConsumer2, 1L, 2L, false, true, "atp");
        Train trainResult = trainDao.insertUpdate(tpjdbcTemplate, train);

        if (mensajesConsumer2 == 3) {
            throw new Exception();
        }
    }

}

根据我对 Bitronix 功能的理解,基于我的监听器实现:

  1. 在收到第一条消息时:必须在每个数据库中插入一行并将消息出列。 -> 这很好用。

  2. 第二条和第三条传入消息:由于异常必须插入 0 行并将消息保留在队列中。 -> 没有插入行,但消息已出队。

此外,我想补充一点,它在执行过程中记录了以下内容: [main] bitronix.tm.recovery.Recoverer:恢复提交 0 个悬空事务并回滚 4 个资源上的 0 个中止事务 [AMQLocal,vds,AMQRemote,tpds]

所以,我了解到两个经纪人和两个数据库都已注册。但是当监听器处理第二条消息时(它抛出一个异常),它会记录:

WARN 5740 [Session Task-1] bitronix.tm.twopc.Preparer : 使用 0 登记资源执行事务

对这个问题有任何想法吗??

您可以在以下位置找到完整代码:https://github.com/PedroRamirezTOR/spring-jms-jdbc-integration.git

谢谢!

【问题讨论】:

    标签: spring-boot mariadb activemq bitronix


    【解决方案1】:

    首先,recovery committed 0 dangling transaction(s) and rolled back 0 aborted transaction(s) on 4 resource(s) 消息会不时出现,这是完全正常的。只要提交和回滚的计数器为零,您就可以忽略它。

    executing transaction with 0 enlisted resource 日志看起来很真实。

    我高度怀疑您的 Spring 设置存在问题。我无论如何都不是 Spring 专家,但 DefaultJmsListenerContainerFactory 应该引用您的 Spring PlatformTransactionManager 实例,以便它知道它必须以事务方式工作,因此您应该调用 factory.setTransactionManager(PlatformTransactionManager)

    这至少应该让你进入下一步。

    【讨论】:

    • 非常感谢卢多维奇!你是对的!最后,我决定删除 BitronixJtaConfiguration 并使用嵌入式 Spring 的事务管理器,因为它不允许我以这种方式修改侦听器工厂。如果有人对解决方案感兴趣可以在以下位置找到:github.com/PedroRamirezTOR/spring-jms-jdbc-integration.git
    • 你的 github 链接达到 404。
    • 仅供参考,摆脱 BTM 将使您的程序在发生崩溃时部分回滚,因为 Spring 嵌入的事务管理器仅实现快乐路径,而没有任何崩溃恢复位。
    • 好的,我上传代码时出错了。明天我会再次上传。你的意思是,例如,如果在事务期间数据库连接丢失,那么事务中包含的其余资源将被提交?
    • 我想到的最简单的例子是,如果你的 java 进程在提交过程中崩溃。
    猜你喜欢
    • 2016-06-26
    • 1970-01-01
    • 2015-09-27
    • 1970-01-01
    • 1970-01-01
    • 2012-03-18
    • 1970-01-01
    • 2016-08-30
    • 1970-01-01
    相关资源
    最近更新 更多