前言:本文是基于Spring和ActiveMQ的一个示例文章,包括了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,只是做了比较简单的实现,无任何业务方面的东西,作为一个入门教程。

适合对象:希望学习ActiveMQ的朋友,以及利用Spring将ActiveMQ集成到系统中

所需知识:Springframework,JMS,如果不清楚可以看看http://zh.wikipedia.org/zh-cn/Java消息服务

下载地址:http://pan.baidu.com/s/1c0ms8mc

目录结构

ActiveMQ 实现队列消息和PUB/SUB模型

Maven依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<!-- Jar版本管理 -->
    <properties>
        <springframework>4.0.2.RELEASE</springframework>
        <log4j>1.2.17</log4j>
        <activemq>5.9.0</activemq>
    </properties>
 
    <dependencies>
        <!-- Spring web mvc -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${springframework}</version>
        </dependency>
 
        <!-- 提供JMS,Freemarker,Quartz集成服务 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>${springframework}</version>
        </dependency>
         
        <!-- 集成JMS -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${springframework}</version>
        </dependency>
         
        <!-- xbean 如<amq:connectionFactory /> -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
 
        <!-- log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j}</version>
        </dependency>
 
        <!-- Active MQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>${activemq}</version>
        </dependency>
 
        <!-- 单元测试 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

 

jar包截图

ActiveMQ 实现队列消息和PUB/SUB模型

web.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<?xml version="1.0" encoding="UTF-8"?>
<!-- 通过http://java.sun.com/xml/ns/javaee/获取最新的schemaLocation -->
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">
    <display-name>SpringActivemqServer</display-name>
 
    <!-- WebAppRootKey -->
    <context-param>
        <param-name>webAppRootKey</param-name>
        <param-value>example.SpringActivemqServer</param-value>
    </context-param>
 
    <!-- Log4J Start -->
    <context-param>
        <param-name>log4jConfigLocation</param-name>
        <param-value>classpath:log4j.properties</param-value>
    </context-param>
    <context-param>
        <param-name>log4jRefreshInterval</param-name>
        <param-value>6000</param-value>
    </context-param>
    <!-- Spring Log4J config -->
    <listener>
        <listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
    </listener>
    <!-- Log4J End -->
 
    <!-- Spring 编码过滤器 start -->
    <filter>
        <filter-name>characterEncoding</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>characterEncoding</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
    <!-- Spring 编码过滤器 End -->
 
    <!-- Spring Application Context Listener Start -->
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>classpath*:applicationContext.xml,classpath*:ActiveMQ.xml</param-value>
    </context-param>
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <!-- Spring Application Context Listener End -->
 
 
    <!-- Spring MVC Config Start -->
    <servlet>
        <servlet-name>SpringMVC</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
 
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:spring-mvc.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>SpringMVC</servlet-name>
        <!-- Filter all resources -->
        <url-pattern>/</url-pattern>
    </servlet-mapping>
    <!-- Spring MVC Config End -->
 
</web-app>

applicationContext.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">
      
     <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>
      
     <!-- 配置扫描路径 -->
     <context:component-scan base-package="org.xdemo.example">
       <!-- 只扫描Service,也可以添加Repostory,但是要把Controller排除在外,Controller由spring-mvc.xml去加载 -->
       <!-- <context:include-filter type="annotation" expression="org.springframework.stereotype.Service" /> -->
       <!-- <context:include-filter type="annotation" expression="org.springframework.stereotype.Repository" /> -->
       <!-- <context:include-filter type="annotation" expression="org.springframework.stereotype.Component" /> -->
       <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
     </context:component-scan>
 
</beans>

spring-mvc.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>  
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"  
       xmlns:mvc="http://www.springframework.org/schema/mvc"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd   
        http://www.springframework.org/schema/mvc   
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">  
   
      <!-- 启用MVC注解 -->
    <mvc:annotation-driven />
     
    <!-- 静态资源文件,不会被Spring MVC拦截 -->
    <mvc:resources location="/resources/" mapping="/resources/**"/>
     
    <!-- 指定Sping组件扫描的基本包路径 -->
    <context:component-scan base-package="org.xdemo.example" >
        <!-- 这里只扫描Controller,不可重复加载Service -->
        <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>
     
      <!-- JSP视图解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">  
        <property name="prefix" value="/WEB-INF/views/" />  
        <property name="suffix" value=".jsp" />
        <property name="order" value="1" />
    </bean>
     
     
</beans>

ActiveMQ.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
 
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
 
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>
     
    <!-- ====Producer side start====-->
     
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->
        <property name="pubSubDomain" value="false" />
    </bean>
     
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="connectionFactory" />
        <!-- pub/sub模型(发布/订阅) -->
        <property name="pubSubDomain" value="true" />
    </bean>
     
    <!-- ====Producer side end====-->
 
     
    <!-- ====Consumer side start====-->
     
    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.queue" ref="queueReceiver"/>
        <jms:listener destination="test.queue" ref="queueReceiver2"/>
    </jms:listener-container>
     
    <!-- 定义Topic监听器 -->
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <jms:listener destination="test.topic" ref="topicReceiver"/>
        <jms:listener destination="test.topic" ref="topicReceiver2"/>
    </jms:listener-container>
     
    <!-- ====Consumer side end==== -->
</beans>

队列消息生产者

QueueSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.xdemo.example.SpringActivemq.mq.producer.queue;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午9:40:24
 * @描述 发送消息到队列
 */
@Component
public class QueueSender {
     
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean
     
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName,final String message){
        jmsTemplate.send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
     
}

主题(Topic)消息生产者TopicSender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package org.xdemo.example.SpringActivemq.mq.producer.topic;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午9:40:32
 * @描述 发送消息到主题
 */
@Component
public class TopicSender {
     
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;
     
    /**
     * 发送一条消息到指定的队列(目标)
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String topicName,final String message){
        jmsTemplate.send(topicName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
 
}

消费者

队列消费者QueueReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.queue;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
import org.springframework.stereotype.Component;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:11:51
 * @描述 队列消息监听器
 */
@Component
public class QueueReceiver implements MessageListener {
 
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

QueueReceiver2.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.queue;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
import org.springframework.stereotype.Component;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:11:51
 * @描述 队列消息监听器
 */
@Component
public class QueueReceiver2 implements MessageListener {
 
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

主题(Topic)消费者

TopicReceiver.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.topic;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
import org.springframework.stereotype.Component;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:13:47
 * @描述 Topic消息监听器
 */
@Component
public class TopicReceiver implements MessageListener{
 
 
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
     
}

TopicRecever.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 
 */
package org.xdemo.example.SpringActivemq.mq.consumer.topic;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
import org.springframework.stereotype.Component;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:13:47
 * @描述 Topic消息监听器
 */
@Component
public class TopicReceiver2 implements MessageListener{
 
 
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText());
        catch (JMSException e) {
            e.printStackTrace();
        }
    }
     
}

控制器Controller代码

ActivemqController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
 
 */
package org.xdemo.example.SpringActivemq.controller;
 
import javax.annotation.Resource;
 
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.xdemo.example.SpringActivemq.mq.producer.queue.QueueSender;
import org.xdemo.example.SpringActivemq.mq.producer.topic.TopicSender;
 
/**
 * @作者 Goofy
 * @邮件 252878950@qq.com
 * @日期 2014-4-1上午10:54:11
 * @描述 测试 
 */
@Controller
@RequestMapping("/activemq")
public class ActivemqController {
     
    @Resource QueueSender queueSender;
    @Resource TopicSender topicSender;
     
    /**
     * 发送消息到队列
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("queueSender")
    public String queueSender(@RequestParam("message")String message){
        String opt="";
        try {
            queueSender.send("test.queue", message);
            opt="suc";
        catch (Exception e) {
            opt=e.getCause().toString();
        }
        return opt;
    }
     
    /**
     * 发送消息到主题
     * @param message
     * @return String
     */
    @ResponseBody
    @RequestMapping("topicSender")
    public String topicSender(@RequestParam("message")String message){
        String opt="";
        try {
            topicSender.send("test.topic", message);
            opt="suc";
        catch (Exception e) {
            opt=e.getCause().toString();
        }
        return opt;
    }
     
}

前端页面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
    String path = request.getContextPath();
    String basePath = request.getScheme() + "://"
            + request.getServerName() + ":" + request.getServerPort()
            + path + "/";
%>
 
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<base href="<%=basePath%>">
 
<title>ActiveMQ Demo程序</title>
 
<meta http-equiv="pragma" content="no-cache">
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<script type="text/javascript" src="<%=basePath%>resources/jquery-1.11.0.min.js"></script>
<style type="text/css">
.h1 {
    margin: 0 auto;
}
 
#producer{
    width: 48%;
    border: 1px solid blue;
    height: 80%;
    align:center;
    margin:0 auto;
}
 
body{
    text-align :center;
div {
    text-align :center;
}
textarea{
    width:80%;
    height:100px;
    border:1px solid gray;
}
button{
    rgb(6215666);
    border: none;
    font-weight: bold;
    color: white;
    height:30px;
}
</style>
<script type="text/javascript">
     
    function send(controller){
        if($("#message").val()==""){
            $("#message").css("border","1px solid red");
            return;
        }else{
            $("#message").css("border","1px solid gray");
        }
        $.ajax({
            type: 'post',
            url:'<%=basePath%>activemq/'+controller,
            dataType:'text'
            data:{"message":$("#message").val()},
            success:function(data){
                if(data=="suc"){
                    $("#status").html("<font color=green>发送成功</font>");
                    setTimeout(clear,1000);
                }else{
                    $("#status").html("<font color=red>"+data+"</font>");
                    setTimeout(clear,5000);
                }
            },
            error:function(data){
                $("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>");
                setTimeout(clear,5000);
            }
             
        });
    }
     
    function clear(){
        $("#status").html("");
    }
 
</script>
</head>
 
<body>
    <h1>Hello ActiveMQ</h1>
    <div id="producer">
        <h2>Producer</h2>
        <textarea id="message"></textarea>
        <br>
        <button onclick="send('queueSender')">发送的Queue</button>
        <button onclick="send('topicSender')">发送的Topic</button>
        <br>
        <span id="status"></span>
    </div>
</body>
</html>

运行截图:

ActiveMQ 实现队列消息和PUB/SUB模型

ActiveMQ 实现队列消息和PUB/SUB模型

消息发送采用的是Spring提供的JmsTemplate,采用的是异步操作的方式,将消息发布到队列中,消费者可以随时接受这条消息。

从上图可以看出队列模型和PUB/SUB模型的区别,Queue只能由一个消费者接收,其他Queue中的成员无法接受到被已消费的信息,而Topic则可以,只要是订阅了Topic的消费者,全部可以获取到生产者发布的信息。

 

1    queue与topic的技术特点对比

 

          Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。

 

Topic

Queue

概要

Publish Subscribe messaging 发布订阅消息

Point-to-Point 点对点

有无状态

topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障

并不保证publisher发布的每条数据,Subscriber都能接受到。

Queue保证每条数据都能被receiver接收。

消息是否会丢失

一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。

Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器

一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

2    topic和queue方式的消息处理效率比较

        通过增加监听客户端的并发数来验证,topic的消息推送,是否会因为监听客户端的并发上升而出现明显的下降,测试环境的服务器为ci环境的ActiveMQ,客户端为我的本机。

        从实测的结果来看,topic方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者(线程)并发的前提下,效率差异很明显(由于500线程并发的情况下,我本机的cpu占用率已高达70-90%,所以无法确认是我本机测试造成的性能瓶颈还是topic消息发送方式存在性能瓶颈,造成效率下降如此明显)。

        Topic方式发送的消息与queue方式发送的消息,发送和接收的效率,在一个订阅者和100个订阅者的前提下没有明显差异,但在500个订阅者并发的前提下,topic方式的效率明显低于queue。

        Queue方式发送的消息,在一个订阅者、100个订阅者和500个订阅者的前提下,发送和接收的效率没有明显变化。

Topic实测数据:

 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

101ms

100订阅者

100

10000

103ms

500订阅者

100

50000

14162ms

 

Queue实测数据:

 

 

发送者发送的消息总数

所有订阅者接收到消息的总数

消息发送和接收平均耗时

单订阅者

100

100

96ms

100订阅者

100

100

96ms

500订阅者

100

100

100ms

 

3     topic方式的消息处理示例
3.1     通过客户端代码调用来发送一个topic的消息:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

 

publicclass SendTopic {

    privatestaticfinalint SEND_NUMBER = 5;

    publicstaticvoid sendMessage(Session session, MessageProducer producer)

            throws Exception {

         for ( int i = 1; i <= SEND_NUMBER; i++) {

            TextMessage message = session

                    .createTextMessage("ActiveMq发送的消息" + i);

            //发送消息到目的地方

            System. out.println("发送消息:" + "ActiveMq 发送的消息" + i);

            producer.send(message);

        }

    }

   

    publicstaticvoid main(String[] args) {

        // ConnectionFactory:连接工厂,JMS用它创建连接

        ConnectionFactory connectionFactory;

        // Connection:JMS客户端到JMS Provider的连接

        Connection connection = null;

        // Session:一个发送或接收消息的线程

        Session session;

        // Destination:消息的目的地;消息发送给谁.

        Destination destination;

        // MessageProducer:消息发送者

        MessageProducer producer;

        // TextMessage message;

        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar

        connectionFactory = new ActiveMQConnectionFactory(

                ActiveMQConnection. DEFAULT_USER,

                ActiveMQConnection. DEFAULT_PASSWORD,

                "tcp://10.20.8.198:61616");

        try {

            //构造从工厂得到连接对象

            connection = connectionFactory.createConnection();

            //启动

            connection.start();

            //获取操作连接

            session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);

            //获取session注意参数值FirstTopic是一个服务器的topic(与queue消息的发送相比,这里是唯一的不同)

            destination = session.createTopic("FirstTopic");

            //得到消息生成者【发送者】

            producer = session.createProducer(destination);

            //设置不持久化,此处学习,实际根据项目决定

            producer.setDeliveryMode(DeliveryMode. PERSISTENT);

            //构造消息,此处写死,项目就是参数,或者方法获取

            sendMessage(session, producer);

            session.commit();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                if ( null != connection)

                    connection.close();

            } catch (Throwable ignore) {

            }

        }

    }

}

 

3.2     启动多个客户端监听来接收topic的消息:

publicclass ReceiveTopic implements Runnable {

      private StringthreadName;

 

      ReceiveTopic(String threadName) {

           this.threadName = threadName;

      }

 

      publicvoid run() {

           // ConnectionFactory:连接工厂,JMS用它创建连接

           ConnectionFactory connectionFactory;

           // Connection:JMS客户端到JMS Provider的连接

           Connection connection = null;

           // Session:一个发送或接收消息的线程

           Session session;

           // Destination:消息的目的地;消息发送给谁.

           Destination destination;

           //消费者,消息接收者

           MessageConsumer consumer;

           connectionFactory = new ActiveMQConnectionFactory(

                      ActiveMQConnection. DEFAULT_USER,

                      ActiveMQConnection. DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");

           try {

                 //构造从工厂得到连接对象

                 connection = connectionFactory.createConnection();

                 //启动

                 connection.start();

                 //获取操作连接,默认自动向服务器发送接收成功的响应

                 session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);

                 //获取session注意参数值FirstTopic是一个服务器的topic

                 destination = session.createTopic("FirstTopic");

                 consumer = session.createConsumer(destination);

                 while ( true) {

                      //设置接收者接收消息的时间,为了便于测试,这里设定为100s

                      TextMessage message = (TextMessage) consumer

                                  .receive(100 * 1000);

                      if ( null != message) {

                            System. out.println("线程"+threadName+"收到消息:" + message.getText());

                      } else {

                            continue;

                      }

                 }

           } catch (Exception e) {

                 e.printStackTrace();

           } finally {

                 try {

                      if ( null != connection)

                            connection.close();

                 } catch (Throwable ignore) {

                 }

           }

      }

 

      publicstaticvoid main(String[] args) {

            //这里启动3个线程来监听FirstTopic的消息,与queue的方式不一样三个线程都能收到同样的消息

           ReceiveTopic receive1= new ReceiveTopic("thread1");

           ReceiveTopic receive2= new ReceiveTopic("thread2");

           ReceiveTopic receive3= new ReceiveTopic("thread3");

           Thread thread1= new Thread(receive1);

           Thread thread2= new Thread(receive2);

           Thread thread3= new Thread(receive3);

           thread1.start();

           thread2.start();

           thread3.start();

      }

}

相关文章:

  • 2022-12-23
  • 2021-08-08
  • 2021-12-10
  • 2021-07-05
  • 2021-04-15
  • 2021-12-15
  • 2021-09-20
  • 2021-05-07
猜你喜欢
  • 2021-07-08
  • 2022-01-10
  • 2022-01-06
  • 2022-01-01
  • 2021-10-15
  • 2021-08-14
  • 2022-12-23
相关资源
相似解决方案