Spring Boot整合ActiveMQ
ActiveMQ概述
官网:http://activemq.apache.org/
MQ全称是MessageQueue(消息队列),是一个消息的接收和转发的容器,主要用于消息的推送。ActiveMQ是Apache提供的一个开源消息中间件,纯Java技术实现。
ActiveMQ的安装(windows)
- 打开下载官网:http://activemq.apache.org/download.html
- 将下载的压缩包解压到对应的文件目录下,将看到下图的文件目录
- 启动和关闭
命令行启动,进入activemq解压后的bin目录执行activemq start命令
命令行关闭,进入activemq解压后的bin目录执行activemq stop命令
图标启动方法:根据32还是64位操作系统,进入win32或者win64,双击activemq.bat即可启动。
另外还提供了将activemq安装到window系统的服务中的方法,分别是InstallService.bat和UnistatllService.bat
安装到windows服务说明(如果经常使用到,想跟随系统自动启动可以设置)
- 双击InstallService.bat
- 计算机->管理->服务和应用程序->服务->ActiveMQ
,可以看到是“自动”启动的,就是根据系统启动就自动启动。可以点击进去设置为手动。如果在项目开发中经常使用到可以设置为自动启动。
- 卸载安装到window服务,双击UninstallService.bat。刷新服务就可以看到ActiveMQ没了。
打开主页:localhost:8161/admin 默认账号密码:admin/admin
Spring Boot集成ActiveMQ
Spring Boot官方文档(33.1.1 ActiveMQ Support目录,配置熟悉PartX.Appendices目录):https://docs.spring.io/spring-boot/docs/2.1.2.RELEASE/reference/htmlsingle/#boot-features-activemq
源码地址:https://github.com/superRabbitMan/spring-boot-activemq
ActiveMQ配置属性大全
# ACTIVEMQ (ActiveMQProperties)
spring.activemq.broker-url= # URL of the ActiveMQ broker. Auto-generated by default.
spring.activemq.close-timeout=15s # Time to wait before considering a close complete.
spring.activemq.in-memory=true # Whether the default broker URL should be in memory. Ignored if an explicit broker has been specified.
spring.activemq.non-blocking-redelivery=false # Whether to stop message delivery before re-delivering messages from a rolled back transaction. This implies that message order is not preserved when this is enabled.
spring.activemq.password= # Login password of the broker.
spring.activemq.send-timeout=0ms # Time to wait on message sends for a response. Set it to 0 to wait forever.
spring.activemq.user= # Login user of the broker.
spring.activemq.packages.trust-all= # Whether to trust all packages.
spring.activemq.packages.trusted= # Comma-separated list of specific packages to trust (when not trusting all packages).
spring.activemq.pool.block-if-full=true # Whether to block when a connection is requested and the pool is full. Set it to false to throw a "JMSException" instead.
spring.activemq.pool.block-if-full-timeout=-1ms # Blocking period before throwing an exception if the pool is still full.
spring.activemq.pool.enabled=false # Whether a JmsPoolConnectionFactory should be created, instead of a regular ConnectionFactory.
spring.activemq.pool.idle-timeout=30s # Connection idle timeout.
spring.activemq.pool.max-connections=1 # Maximum number of pooled connections.
spring.activemq.pool.max-sessions-per-connection=500 # Maximum number of pooled sessions per connection in the pool.
spring.activemq.pool.time-between-expiration-check=-1ms # Time to sleep between runs of the idle connection eviction thread. When negative, no idle connection eviction thread runs.
spring.activemq.pool.use-anonymous-producers=true # Whether to use only one anonymous "MessageProducer" instance. Set it to false to create one "MessageProducer" every time one is required.
整合
POM
首先需要引入对应的POM包,本地引入jar包如下:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
配置文件中的activemq配置如下:
# activemq配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
spring.activemq.packages.trust-all=true
消息生产者
importorg.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.Destination;
/**
* 消息生产者
* Created by vip on 2019/1/23.
*/
@Component
public class MessageProducer {
@Resource
private JmsMessagingTemplatejmsTemplate;
public void sendMessage(Destination destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
}
消息消费者
importorg.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**消息的小肥猪
* Created by vip on 2019/1/23.
*/
@Component
public class MessageConsumer {
@JmsListener(destination = "message.queue")
public void receiveQueue(String text) {
System.out.println("收到消息 = ["+ text + "]");
}
}
测试
importcom.example.demo.producer.MessageProducer;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import javax.jms.Destination;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootActivemqApplicationTests {
@Resource
private MessageProducermessageProducer;
@Test
public void contextLoads() {
Destination destination = new ActiveMQQueue("message.queue");
messageProducer.sendMessage(destination, "今天天气不错!");
}
}
在控制台中可以看到消息生产发布出去后,就已经被消费掉了。
在activemq管理后台看到了刚才推送的队列名称和消息次数: