【问题标题】:Delay while consuming messages from JMS Topic消费来自 JMS 主题的消息时延迟
【发布时间】:2012-01-24 20:22:30
【问题描述】:

我有一个长期订阅者的主题。我可以发布和使用消息,但是我发现从主题中读取消息时存在一些延迟。

我无法在一次通话中阅读这些消息。我需要多次调用该方法来阅读消息。我错过了什么吗?

    private void publishMessage() {
        TopicConnection topicConnection = null;
        TopicSession topicSession = null;
        TopicPublisher topicPublisher = null;
        try {
          topicConnection = connectionFactory.createTopicConnection();
          topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
          Topic topicName= topicSession.createTopic(topicName);
          topicPublisher = topicSession.createPublisher(topicName);
          ObjectMessage message = topicSession.createObjectMessage(customObject)
          message.setStringProperty("user", userProperty);
          topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive);
        } catch (JMSException e) {
          throw new RuntimeException("Error Sending UMessage", e);
        } finally {
          closeConnections(null, topicPublisher, topicSession, topicConnection);
        }
    }

public void consumeMessages(String userId, int maxResults) {
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicSubscriber topicSubscriber = null;

    try {
      topicConnection = connectionFactory.createTopicConnection("guest","guest");
      topicConnection.setClientID("topic");
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topicName= topicSession.createTopic(topicName);
      topicSubscriber = topicSession.createDurableSubscriber(topicName, "subscriptionname", String.format("user = '%s'", userName), false);
      topicConnection.start();
      Message msg = null;

      do {
        msg = topicSubscriber.receiveNoWait();
        if (msg instanceof ObjectMessage) {
          ObjectMessage om = (ObjectMessage) msg;
         else {
            log.error(String.format(" %s", om.getObject().getClass().getSimpleName()));
          }
        } else if (msg != null) {
          log.error(String.format("e %s", msg.getClass().getSimpleName()));
        }
      } while (msg != null && out.size() <= maxResults);
    } catch (JMSException e) {
      throw new RuntimeException("Error retrieving User Messages", e);
    } finally {
      closeConnections(topicSubscriber, null, topicSession, topicConnection);
    }
    return out;
}

【问题讨论】:

    标签: java jms messaging


    【解决方案1】:

    您正在调用 receiveNoWait(),它将一次检索一条消息。通常情况下,取决于您的 JMS 提供者,JMS 客户端将一次检索多个消息并将它们缓存在客户端以减少网络延迟。当您调用接收时,它会从此缓存中获取消息并将其提供给您。

    如果您看到一分钟的延迟,可能是您将这些消息放到主题上的方式有问题,或者您在处理每条消息时阻止了您的消息接收。如果您不想在处理时阻止消息接收,请查看实现MessageListener 接口而不是使用接收方法,或者您可以从接收方法中获取消息并在线程池上异步处理它们。

    当你创建消费者时,你可以像这样添加监听器:

    MessageListener listener = new MyListener();
    consumer.setMessageListener(listener);
    

    然后创建一个类来处理消息或在您现有的消费者类中实现接口:

    public class MyListener implements MessageListener {
      public void onMessage(Message message)
      {
         TextMessage text = (TextMessage) message;
    
         System.out.println("Message: " + text.getText());
      }
    }
    

    【讨论】:

    • 是的,我想检索订阅者的所有可用消息。这就是循环指定消息数量的原因。它正在阅读所有消息,但不是在发布后立即阅读,需要 5-6 分钟(有时甚至更多)。如果我在调试模式下运行我的 jboss,并且如果我有一些断点,它会立即读取消息,我观察到一种奇怪的行为。但是,如果在正常模式下运行,则需要一些时间来阅读。
    • 既然你提到了 JBoss,我假设你在应用服务器内部使用 JBoss Messaging 作为你的 JMS 提供者?
    • 是的,你说得对。我正在使用 JBOSS 提供的 JMS 服务器。我是否缺少任何配置?
    • 我认为您的代码很好,但我认为您对接收工作方式的期望不正确。查看我的编辑。
    • 感谢 gregory 的投入。实际上我不想使用 MessageListener 或 MDB,因为如果我使用侦听器,则消息一发布到主题就会被使用。我想一口气阅读特定订阅者的所有消息。(通过使用消息选择器)。就像向订阅者发布消息并稍后阅读所有消息一样。
    猜你喜欢
    • 2012-05-17
    • 1970-01-01
    • 1970-01-01
    • 2019-04-21
    • 1970-01-01
    • 2012-01-28
    • 2018-06-08
    • 1970-01-01
    • 2023-02-09
    相关资源
    最近更新 更多