【问题标题】:Synchronous Consumer with JMS Queue使用 JMS 队列的同步消费者
【发布时间】:2012-10-22 15:02:59
【问题描述】:

我想以同步方式处理来自 Glassfish 3 中 JMS 队列的所有消息,因此我尝试在 Glassfish 窗口中的 JMS Physical Destination 中将属性 Maximum Active Consumers 从 -1 更改为 1。我认为设置这个我将只有一个消费者同时执行 OnMessage()。我遇到的问题是,当我更改该属性时出现此错误:

[I500]: Caught JVM Exception: org.xml.sax.SAXParseException: Content is not allowed in prolog.

[I500]: Caught JVM Exception: com.sun.messaging.jms.JMSException: Content is not allowed in prolog.

sendMessage Error [C4038]: com.sun.messaging.jms.JMSException: Content is not allowed in prolog.

如果有人知道使方法 onmessage() 同步的另一种方法,将不胜感激。这是我的消费类:

@MessageDriven(mappedName = "QueueListener", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageBean implements MessageListener {



@Override
public void onMessage(Message message) {
    long t1 = System.currentTimeMillis();
    write("MessageBean has received " + message);
    try{

        TextMessage result=(TextMessage)message;
        String text=result.getText();
        write("OTAMessageBean message ID has resolved to " + text);
        int messageID=Integer.valueOf(text);

        AirProcessing aP=new AirProcessing();
        aP.pickup(messageID);


    }
    catch(Exception e){
        raiseError("OTAMessageBean error " + e.getMessage());
    }
   long t2 = System.currentTimeMillis();
   write("MessageBean has finished in " + (t2-t1)); 

}



}

【问题讨论】:

    标签: java glassfish jms


    【解决方案1】:

    我遇到了同样的问题,我找到的唯一解决方案是设置一个Schedule,它每十秒轮询一次队列中的消息:

    @Stateless
    public class MyReceiver {
       @Resource(mappedName = "jms/MyQueueFactory")
       private QueueConnectionFactory connectionFactory;
       @Resource(mappedName = "jms/MyQueue")
       private Queue myQueue;
       private QueueConnection qc;
       private QueueSession session;
       private MessageConsumer consumer;
    
    
       @PostConstruct
       void init() {
           try {
             qc = connectionFactory.createQueueConnection();
             session = qc.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
             consumer = session.createConsumer(myQueue);
             qc.start();
           } catch (JMSException e) {
             throw new RuntimeException(e);
           }
       }
    
       @PreDestroy
       void cleanup() throws JMSException {
         qc.close();
       }
    
       @Schedule(hour = "*", minute = "*", second = "*/10", persistent = false)
       public void onMessage() throws JMSException {
         Message message;
         while ((message = consumer.receiveNoWait()) != null) {
           ObjectMessage objMsg = (ObjectMessage) message;
           Serializable content;
           try {
             content = objMsg.getObject();
    
             //Do sth. with "content" here
    
             message.acknowledge();
           } catch (JMSException ex) {
             ex.printStackTrace();
           }
        }
      }
    }
    

    【讨论】:

      【解决方案2】:

      JMS 本质上是异步的,您没有特定的配置来告诉 io 同步运行。您可以通过在各处添加消息传递和消费确认来模拟它,但这并不是 JMS 真正的工作方式。尝试 RMIenter link description here 或 HTTP(或其他诸如 SOAP 或 REST Web 服务之类的东西)

      【讨论】:

      • 我想要一个独特的单线程消费者。我遇到的问题是 onMessage() 对收到的每条消息都执行异步操作,所以如果我有 10 个生产者每个发送一条消息,我将同时有 10 个消费者,所以队列“总是空的”。队列的理论目的是保存消息(以 FIFO 或 LIFO 方式),直到任何消费者获得并处理它。如果队列创建了无限的消费者线程,那么使用队列有什么好处?
      • 嗯,正如你所说,JMS 是一种生产者-消费者范式实现,它是一种以异步方式运行(生产/消费)的方式,即生产者和消费者都不是需要彼此等待完成。这样做有好处,比如当“消费”部分仅在生产者完成后才可用时,不会无用地阻塞。无论如何,JMS 可以同步或异步使用,但恕我直言,它并不是真正为同步操作而设计的(至少因为与实现此目的的其他方法相比,它意味着开销)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-10-23
      • 2013-08-08
      • 2015-08-03
      • 2021-02-18
      • 1970-01-01
      • 2018-01-28
      • 1970-01-01
      相关资源
      最近更新 更多