【问题标题】:How to implement jms queue in spring boot如何在spring boot中实现jms队列
【发布时间】:2017-04-17 08:58:00
【问题描述】:

我已经使用初始化程序创建了一个 Spring Boot 项目,我正在尝试创建我的第一条消息,但我不知道从哪里开始。我熟悉使用 JEE 的相同过程,所以我想我需要创建一个工厂、一个发送者和一个消费者。

谁能帮帮我?

【问题讨论】:

    标签: spring-boot jms spring-jms


    【解决方案1】:

    最好的起点是projects getting started guide

    你的方法在一般意义上是正确的,但他就是骨架的样子。

    第一个 spring-boot 为您提供了一个完美的配置文件结构,如果您使用像 Netbeans 这样的智能 ide,那么通过添加 spring-boot 插件也会在属性文件中为您提供自动完成功能。由于 Spring 对每个代理的行为略有不同,因此在我的示例中,我将使用 ActiveMQ

    通过在我们的构建路径上添加 ActiveMQ,Spring Boot 将自动设置一个 ActiveMQ 代理。我们需要设置几个属性以使其成为内存代理,而无需连接池。我们可以通过为 Spring Boot 设置两个属性来做到这一点。

    spring.activemq.in-memory=true
    spring.activemq.pooled=false
    jms.bookmgrqueue.name=book-mgr-queue #queue name
    

    其他代理也可以进行类似的配置。

    首先,您开始设置 Spring 应用程序。您应该放置 @EnableJms 注释以启用 Jms 支持,然后设置一个新队列。

    例子

    @EnableJms
    @Configuration
    public class JmsConfiguration {
    
        @Autowired
        private BeanFactory springContextBeanFactory;
    
        @Bean
        public DefaultJmsListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory factory =
                    new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setDestinationResolver(new BeanFactoryDestinationResolver(springContextBeanFactory));
            factory.setConcurrency("3-10");
            return factory;
        }
    
        @Bean
        public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) throws JMSException {
            return new JmsTemplate(connectionFactory);
        }
    
    }
    

    收听队列消息

    侦听器组件 (BookMgrQueueListener.java) 使用 Spring 的 @JmsListener 注释和选择器来读取具有给定 Operation 标头的消息。

    @Component
    public class BookMgrQueueListener implements Loggable{
    
        private final BookService bookService;
    
        @Autowired
        public BookMgrQueueListener(BookService bookService) {
            this.bookService = bookService;
        }
    
        @JmsListener(containerFactory = "containerFactory",
                     destination = "bookMgrQueueDestination",
                     selector = "Operation = 'Create'")
        public void processCreateBookMessage(BookDTO book) throws JMSException{
            bookService.createNew(book);
        }
    
        @JmsListener(containerFactory = "containerFactory",
                     destination = "bookMgrQueueDestination",
                     selector = "Operation = 'Update'")
        public void processUpdateBookMessage(BookDTO book) throws JMSException{
            bookService.update(book.getIsbn(), book);
        }
    
        @JmsListener(containerFactory = "containerFactory",
                     destination = "bookMgrQueueDestination",
                     selector = "Operation = 'Delete'")
        public void processDeleteBookMessage(BookDTO book) throws JMSException{
            bookService.delete(book.getIsbn());
        }
    
    }
    

    用于测试的活动 MQ

    为了测试配置,我们在一个新的配置文件 ActiveMqConfiguration.java 中设置了 activeMq 代理。

    @Configuration
    public class ActiveMqConfiguration {
    
        public static final String ADDRESS = "vm://localhost";
    
        private BrokerService broker;
    
        @Bean(name="bookMgrQueueDestination")
        public Destination bookMgrQueueDestination(@Value("${jms.bookmgrqueue.name}") String bookMgrQueueName)
                throws JMSException {
            return new ActiveMQQueue(bookMgrQueueName);
        }
    
        @PostConstruct
        public void startActiveMQ() throws Exception {
            broker = new BrokerService();
            // configure the broker
            broker.setBrokerName("activemq-broker");
            broker.setDataDirectory("target");
            broker.addConnector(ADDRESS);
            broker.setUseJmx(false);
            broker.setUseShutdownHook(false);
            broker.start();
        }
    
        @PreDestroy
        public void stopActiveMQ() throws Exception {
            broker.stop();
        }
    
        @Bean
        public ConnectionFactory connectionFactory() {
            return new ActiveMQConnectionFactory(ADDRESS + "?broker.persistent=false");
        }
    }
    

    我们正在测试用例中设置完整的应用程序上下文,但我们正在将侦听器中的 BookService 引用替换为 MockedBookService,我们将使用它来验证是否执行了正确的调用。

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes = Application.class, loader = SpringApplicationContextLoader.class)
    @WebAppConfiguration
    public class BookMgrQueueListenerIntegrationTest {
    
        @Autowired(required = false)
        private JmsTemplate jmsTemplate;
    
        @Autowired
        private BookMgrQueueListener bookMgrQueueListener;
    
    
        @Autowired(required = false)
        @Qualifier("bookMgrQueueDestination")
        private Destination bookMgrQueueDestination;
    
        @Mock
        private BookService mockBookService;
    
        @Captor
        private ArgumentCaptor<BookDTO> bookArgumentCaptor;
    
        @Before
        public void setUp(){
            MockitoAnnotations.initMocks(this);
            ReflectionTestUtils.setField(bookMgrQueueListener, "bookService", mockBookService);
        }
    
        /* ... tests */
    }
    

    最后,我们为所有操作添加测试,并验证是否使用正确的操作和参数调用了服务层。

    /* ... */
    public class BookMgrQueueListenerIntegrationTest {
        /* ... */
        @Test
        public void testSendCreateBookMessage(){
            BookDTO book =  new BookDTO("isbn", "title", "author");
            jmsTemplate.convertAndSend(bookMgrQueueDestination, book, Message -> {
                return OperationHeader.CREATE.applyToMessage(Message);
            });
            // verify
            verify(mockBookService).createNew(bookArgumentCaptor.capture());
            assertEquals(book.getIsbn(), bookArgumentCaptor.getValue().getIsbn());
            assertEquals(book.getTitle(), bookArgumentCaptor.getValue().getTitle());
            assertEquals(book.getAuthor(), bookArgumentCaptor.getValue().getAuthor());
        }
    
        @Test
        public void testSendUpdateBookMessage(){
            BookDTO book =  new BookDTO("isbn", "title", "author");
            jmsTemplate.convertAndSend(bookMgrQueueDestination, book, Message -> {
                return OperationHeader.UPDATE.applyToMessage(Message);
            });
            // verify
            verify(mockBookService).update(eq(book.getIsbn()), bookArgumentCaptor.capture());
            assertEquals(book.getIsbn(), bookArgumentCaptor.getValue().getIsbn());
            assertEquals(book.getTitle(),bookArgumentCaptor.getValue().getTitle());
            assertEquals(book.getAuthor(),bookArgumentCaptor.getValue().getAuthor());
        }
    
        @Test
        public void testSendDeleteBookMessage(){
            BookDTO book =  new BookDTO("isbn", "title", "author");
            jmsTemplate.convertAndSend(bookMgrQueueDestination, book, Message -> {
                return OperationHeader.DELETE.applyToMessage(Message);
            });
            // verify
            verify(mockBookService).delete(book.getIsbn());
        }
    

    我们很高兴!

    参考Integrate JMS queue into a Spring Application

    【讨论】:

    • 非常感谢!这很有帮助!
    • 很好的答案和参考资料。对那些想用 jmslistener 注释方法在 bean 上 Mockito.spy() 的人来说只是一个警告:间谍阻止它实际监听队列。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-19
    • 2022-01-07
    • 1970-01-01
    • 2020-09-13
    • 1970-01-01
    • 2015-05-12
    • 1970-01-01
    相关资源
    最近更新 更多