【问题标题】:How to get number of pending message in a jms queue如何获取 jms 队列中未决消息的数量
【发布时间】:2017-09-03 15:19:26
【问题描述】:

有什么方法可以获取 jms 队列中未决消息的计数。如果队列中没有剩余要处理的消息,我的目标是关闭连接。我怎样才能做到这一点。

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    Connection connection = connectionFactory.createConnection("admin", "admin");
    connection.start();

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue(subject);

    MessageConsumer consumer = session.createConsumer(destination);

    while (true) {
        Message message = consumer.receive();

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Incoming Message:: '" + textMessage.getText() + "'");
        }
    }

【问题讨论】:

  • 嗨赫尔曼,你得到了你的问题解决方案,实际上我遇到了同样的问题,我不想使用循环来计算待处理消息的总数。您是否找到任何解决方案来获取待处理消息的数量?
  • @BhaumikSathvara 请看stackoverflow.com/a/43292110/3570413

标签: jms activemq


【解决方案1】:

我用 jmx 完成了这个,它比 @Tim Bish 有效

这是我更新的代码

JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://0.0.0.0:44444/jndi/rmi://0.0.0.0:1099/karaf-root");

HashMap<String, String[]> environment = new HashMap<String, String[]>();
String[] creds = { "admin", "admin" };
environment.put(JMXConnector.CREDENTIALS, creds);

JMXConnector jmxc = JMXConnectorFactory.connect(url, environment);
MBeanServerConnection connection = jmxc.getMBeanServerConnection();

ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName=myqueue");
DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers, DestinationViewMBean.class, true);
long queueSize = mbView.getQueueSize();
System.out.println(queueSize);

【讨论】:

    【解决方案2】:

    如果您的 JMS 消息为空,只需中断循环并关闭连接..

       while (true) {
        Message message = consumer.receive(2000);
        if (message == null){
            break;
        }
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Incoming Message:: '" + textMessage.getText() + "'");
         }
       }
      connection.close();
    

    【讨论】:

    • 这不起作用,因为默认情况下 consumer.receive() 是阻塞的,这意味着它将等待消息到达队列。
    【解决方案3】:

    从代理获取真实队列计数的唯一可靠方法是使用队列的JMX MBean 并调用getQueueSize 方法。

    另一种编程方式是使用Statistics Broker Plugin,这需要您能够更改代理配置才能安装它。安装后,您可以向控制队列发送一条特殊消息,并获得包含您要监控的目标详细信息的响应。

    使用 QueueBrowser 不会给你一个真实的计数,因为浏览器对它会分页到内存中发送给你的消息数量有一个最大限制,所以如果你的队列比限制更深,你就不会得到实际的size,只是最大页面大小限制的值。

    【讨论】:

    • 感谢您的回答。我最近开始研究 Jboss fuse,所以我不知道。
    【解决方案4】:

    我已经通过使用 createBrowser 方法完成此操作,下面是我更新的代码。

    public static void main(String[] args) throws JMSException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
    Connection connection = connectionFactory.createConnection("admin", "admin");
    connection.start();
    
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    Destination destination = session.createQueue(subject);
    int queueSize = QueueConsumer.getQueueSize(session, (Queue) destination);
    System.out.println("QUEUE SIZE: " + queueSize);
    MessageConsumer consumer = session.createConsumer(destination);
    
    for (int i = 0; i < queueSize; i++) {
        Message message = consumer.receive();
    
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Incomming Message: '" + textMessage.getText() + "'");
        }
    }
    connection.close();
    }
    
    private int getQueueSize(Session session, Queue queue) {
        int count = 0;
        try {
            QueueBrowser browser = session.createBrowser(queue);
            Enumeration elems = browser.getEnumeration();
            while (elems.hasMoreElements()) {
                elems.nextElement();
                count++;
            }
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
        return count;
    }
    

    【讨论】:

    • 完美答案@herman +1。弹簧靴也有挂钩吗?
    • 此方法只返回一个不超过 ActiveMQ "maxBrowsePageSize" 配置值的计数,除此之外的任何值,即使队列深度大于该数量,也不能保证获得更多消息
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-12-08
    • 1970-01-01
    • 1970-01-01
    • 2012-11-16
    • 1970-01-01
    • 2012-03-20
    • 1970-01-01
    相关资源
    最近更新 更多