【问题标题】:Detect change in Consumers of an ActiveMQ topic检测 ActiveMQ 主题的消费者变化
【发布时间】:2017-09-05 06:20:50
【问题描述】:

我有一个 tomcat 集群,它们使用来自 ActiveMQ 主题的消息。现在,如果集群中的一个 tomcat 出现故障,那么我猜消费者的数量会下降 1。

现在,我想使用有关该主题的一些回调或侦听器来检测该更改。这可行吗?

类似:Region.getDestinations(ActiveMQDestination) 会起作用吗?

【问题讨论】:

    标签: java jms activemq


    【解决方案1】:

    咨询信息是您所需要的。

    每次您收到带有此代码的消息时,这意味着您有消费者开始或停止。

    文档http://activemq.apache.org/advisory-message.html

    示例:

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.command.ActiveMQMessage;
    import org.apache.activemq.command.ConsumerInfo;
    import org.apache.activemq.command.RemoveInfo;
    
    public class AdvisorySupportConsumerAdvisoryTopic {
    
        public static void main(String[] args) throws JMSException {
            Connection conn = null;
            try {
                ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671");
                conn = cf.createConnection("admin", "admin");
                ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
                conn.start();
                Queue q = session.createQueue("Q");
                Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(q);
                MessageConsumer consumer = session.createConsumer(advisoryDestination);
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message msg) {
                        if (msg instanceof ActiveMQMessage) {
                            try {
                                ActiveMQMessage aMsg = (ActiveMQMessage) msg;
                                System.out.println(aMsg.getStringProperty("consumerCount"));
                                System.out.println(aMsg.getStringProperty("producerCount"));
                                if (aMsg.getDataStructure() instanceof ConsumerInfo) {
                                    // Consumer start
                                    ConsumerInfo consumerInfo = (ConsumerInfo) aMsg.getDataStructure();
                                    System.out.println(consumerInfo);
                                } else if (aMsg.getDataStructure() instanceof RemoveInfo) {
                                    // Consumer stop
                                    RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
                                    System.out.println(removeInfo);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e) {
                    }
                }
            }
        }
    }
    

    每次您收到带有此代码的消息时,这意味着您有连接开始或停止。

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.command.ActiveMQMessage;
    import org.apache.activemq.command.ConnectionInfo;
    import org.apache.activemq.command.RemoveInfo;
    
    public class AdvisorySupportConnectionAdvisoryTopic {
    
        public static void main(String[] args) throws JMSException {
            Connection conn = null;
            try {
                ConnectionFactory cf = new ActiveMQConnectionFactory("auto://localhost:5671");
                conn = cf.createConnection("admin", "admin");
                ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
                conn.start();
                Destination advisoryDestination = org.apache.activemq.advisory.AdvisorySupport.getConnectionAdvisoryTopic();
                MessageConsumer consumer = session.createConsumer(advisoryDestination);
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message msg) {
                        if (msg instanceof ActiveMQMessage) {
                            try {
                                ActiveMQMessage aMsg = (ActiveMQMessage) msg;
                                if (aMsg.getDataStructure() instanceof ConnectionInfo) {
                                    // Connection start
                                    ConnectionInfo connectionInfo = (ConnectionInfo) aMsg.getDataStructure();
                                    System.out.println(connectionInfo);
                                } else if (aMsg.getDataStructure() instanceof RemoveInfo) {
                                    // Connection stop
                                    RemoveInfo removeInfo = (RemoveInfo) aMsg.getDataStructure();
                                    System.out.println(removeInfo);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e) {
                    }
                }
            }
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2017-02-14
      • 1970-01-01
      • 2014-05-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-07-09
      • 2014-08-06
      • 2020-02-28
      相关资源
      最近更新 更多