【问题标题】:Jms Template not receiving messages in transactionJms 模板未在事务中接收消息
【发布时间】:2018-02-13 19:22:45
【问题描述】:

配置类

 package jms;

import java.sql.SQLException;

import javax.jms.ConnectionFactory;
import javax.sql.DataSource;

import org.apache.activemq.jms.pool.PooledConnectionFactory;  
import org.apache.activemq.spring.ActiveMQConnectionFactory;  
import org.hsqldb.jdbc.JDBCDriver;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.ComponentScan;  
import org.springframework.context.annotation.Primary;  
import org.springframework.jdbc.datasource.DataSourceTransactionManager;  
import org.springframework.jdbc.datasource.DriverManagerDataSource;  
import org.springframework.jms.connection.JmsTransactionManager;  
import org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy;  
import org.springframework.jms.core.JmsTemplate;  
import org.springframework.transaction.PlatformTransactionManager;  
import org.springframework.transaction.annotation.EnableTransactionManagement;

@org.springframework.context.annotation.Configuration
@ComponentScan("jms")
@EnableTransactionManagement
public class Configuration {

    @Bean 
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL( "tcp://localhost:61616");

        return activeMQConnectionFactory;
    }

    @Bean 
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory());
        return jmsTemplate;
    }

    @Bean
    public JmsTransactionManager transactionManager() {
        JmsTransactionManager p = new JmsTransactionManager(connectionFactory());
        return p;
    }
}

接收器类。

导入 org.springframework.transaction.annotation.Transactional;

@Component
@Transactional
public class ReceiverClass {

   @Autowired
   JmsTemplate jmsTemplate;

   @Transactional
    void func() {
       while (true) {
           Message message = jmsTemplate.receive("tempQueue.queue");
           System.out.println(message.toString());

           throw new RuntimeException();
       }
   }
}

主类

package jms;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;


public class Main {

    public static void main(String[] args) {

        ApplicationContext applicationContext = new AnnotationConfigApplicationContext(Configuration.class);

        ReceiverClass r =  (ReceiverClass) applicationContext.getBean("receiverClass");

        r.func();
    }
}

Pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>spring</groupId>
  <artifactId>spring</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>war</packaging>

<dependencies>  
          <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.0.0.RELEASE</version>
        </dependency>


        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>5.0.0.RELEASE</version>
        </dependency>

        <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.0.0.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-spring</artifactId>
    <version>5.15.3</version>
</dependency>



</dependencies>  

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
    </build>

</project>

我想要的是使用 jmsTemplate 从队列中检索消息并进行一些处理,如果出现问题,检索到的消息将存储回队列中。但是我无法使用上述配置来实现它,并且在读取时消息会从队列中删除,即使抛出了一些异常。

【问题讨论】:

    标签: java spring-transactions spring-jms jmstemplate


    【解决方案1】:

    我刚刚进行了一个测试,它对我来说很好......

    @SpringBootApplication
    public class So48774170Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So48774170Application.class, args).close();
        }
    
        @Configuration
        @EnableTransactionManagement
        public static class Config {
    
            @Bean
            public ApplicationRunner runner(JmsTemplate template, Foo foo) {
                return args -> {
                    template.convertAndSend("foo", "bar");
                    try {
                        foo.test();
                    }
                    catch (RuntimeException e) {
                        // no op
                    }
                    System.out.println("OK:" + foo.test());
                };
            }
    
            @Bean
            @Primary
            public ConnectionFactory cf() {
                return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
            }
    
            @Bean
            public CachingConnectionFactory ccf(ConnectionFactory cf) {
                return new CachingConnectionFactory(cf);
            }
    
            @Bean
            public JmsTemplate template(CachingConnectionFactory ccf) {
                return new JmsTemplate(ccf);
            }
    
            @Bean
            public PlatformTransactionManager transactionManager(CachingConnectionFactory ccf) {
                return new JmsTransactionManager(ccf);
            }
    
        }
    
    }
    

    @Component
    public class Foo {
    
        @Autowired
        private JmsTemplate template;
    
        private int count;
    
        @Transactional
        public Message test() {
            this.template.setReceiveTimeout(5_000);
            Message received = template.receive("foo");
            System.out.println(received);
            if (this.count++ == 0) {
                throw new RuntimeException();
            }
            return received;
        }
    
    }
    

    ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
    ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
    OK:ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:gollum.local-59919-1518553219166-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:gollum.local-59919-1518553219166-4:1:1:1, destination = queue://foo, transactionId = null, expiration = 0, timestamp = 1518553219346, arrival = 0, brokerInTime = 1518553219346, brokerOutTime = 1518553219359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 1, size = 1030, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = bar}
    

    我建议你在调试器中运行;在func() 中放置一个断点并验证调用堆栈上有一个TransactionInterceptor,几个堆栈帧向下。如果没有,则表示@EnableTransactionManagement 代理机制由于某种原因无法正常工作。

    还可以尝试打开 DEBUG 日志记录,看看它是否提供任何线索。

    请注意,建议您在模板中使用CachingConnectionFactory,以避免为每个操作打开新连接。

    【讨论】:

    • 我尝试创建一个数据源 bean(用于 mysql )和 DataSourceTransactionManager 并尝试保存因运行时异常而失败的记录。我可以在堆栈跟踪中找到“TransactionInterceptor”,甚至可以回滚保存的记录。 (在这种情况下交易有效)。但我找不到带有 jmstemplate 和 jmsTransactionManager 的“TransactionInterceptor”。
    • 事务管理器的类型应该没有区别——当然,使用 DS TM 不会帮助你回滚——但事务脚手架不应该改变——所以你的其他东西有问题配置。如果您可以在某个地方发布一个完整、简单的示例来重现该问题,我可以看看。
    • 这是因为func() 不是public。不知道为什么它适用于 DS TM。
    • 这对我有用。实际上,对于数据源和数据源事务管理器,我使用的是不同的方法,即“public int Save () {...}”。所以是的,“公共”创造了差异。谢谢。
    • 你能分享一下你是怎么想出来的步骤吗?这将非常有帮助。
    猜你喜欢
    • 2012-04-05
    • 2015-12-03
    • 2011-10-09
    • 2016-01-24
    • 2012-01-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-12
    相关资源
    最近更新 更多