【问题标题】:JMS message receiver filtering by JMSCorrelationIDJMSorrelationID 过滤 JMS 消息接收方
【发布时间】:2010-09-14 00:34:57
【问题描述】:

如何在 java (JRE /JDK / J2EE 1.4) 中实例化只接收与给定 JMSCorrelationID 匹配的消息的 JMS 队列侦听器?我要获取的消息已发布到队列而不是主题,尽管如果需要可以更改。

这是我目前用来将消息放入队列的代码:
/**
 * publishResponseToQueue publishes Requests to the Queue.
 *
 * @param   jmsQueueFactory             -Name of the queue-connection-factory
 * @param   jmsQueue                    -The queue name for the request
 * @param   response                     -A response object that needs to be published
 * 
 * @throws  ServiceLocatorException     -An exception if a request message
 *                                      could not be published to the Topic
 */
private void publishResponseToQueue( String jmsQueueFactory,
                                    String jmsQueue,
                                    Response response )
        throws ServiceLocatorException {

    if ( logger.isInfoEnabled() ) {
        logger.info( "Begin publishRequestToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + "," + response );
    }
    logger.assertLog( jmsQueue != null && !jmsQueue.equals(""),
                      "jmsQueue cannot be null" );
    logger.assertLog( jmsQueueFactory != null && !jmsQueueFactory.equals(""),
                      "jmsQueueFactory cannot be null" );
    logger.assertLog( response != null, "Request cannot be null" );

    try {

        Queue queue = (Queue)_context.lookup( jmsQueue );

        QueueConnectionFactory factory = (QueueConnectionFactory)
            _context.lookup( jmsQueueFactory );

        QueueConnection connection = factory.createQueueConnection();
        connection.start();
        QueueSession session = connection.createQueueSession( false,
                                    QueueSession.AUTO_ACKNOWLEDGE );

        ObjectMessage objectMessage = session.createObjectMessage();

        objectMessage.setJMSCorrelationID(response.getID());

        objectMessage.setObject( response );

        session.createSender( queue ).send( objectMessage );

        session.close();
        connection.close();

    } catch ( Exception e ) {
        //XC3.2  Added/Modified BEGIN
        logger.error( "ServiceLocator.publishResponseToQueue - Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        throw new ServiceLocatorException( "ServiceLocator.publishResponseToQueue " +
                                           "- Could not publish the " +
                      "Response to the Queue - " + e.getMessage() );
        //XC3.2  Added/Modified END
    }

    if ( logger.isInfoEnabled() ) {
        logger.info( "End publishResponseToQueue: " +
                         jmsQueueFactory + "," + jmsQueue + response );
    }

}  // end of publishResponseToQueue method 

【问题讨论】:

    标签: jakarta-ee queue jms


    【解决方案1】:

    希望这会有所帮助。我使用了 Open MQ。

    package com.MQueues;
    
    import java.util.UUID;
    
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.QueueConnection;
    import javax.jms.QueueReceiver;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import com.sun.messaging.BasicQueue;
    import com.sun.messaging.QueueConnectionFactory;
    
    public class HelloProducerConsumer {
    
    public static String queueName = "queue0";
    public static String correlationId;
    
    public static String getCorrelationId() {
        return correlationId;
    }
    
    public static void setCorrelationId(String correlationId) {
        HelloProducerConsumer.correlationId = correlationId;
    }
    
    public static String getQueueName() {
        return queueName;
    }
    
    public static void sendMessage(String threadName) {
        correlationId = UUID.randomUUID().toString();
        try {
    
            // Start connection
            QueueConnectionFactory cf = new QueueConnectionFactory();
            QueueConnection connection = cf.createQueueConnection();
            QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            BasicQueue destination = (BasicQueue) session.createQueue(threadName);
            MessageProducer producer = session.createProducer(destination);
            connection.start();
    
            // create message to send
            TextMessage message = session.createTextMessage();
            message.setJMSCorrelationID(correlationId);
            message.setText(threadName + "(" + System.currentTimeMillis() 
                    + ") " + correlationId +" from Producer");
    
            System.out.println(correlationId +" Send from Producer");
            producer.send(message);
    
            // close everything
            producer.close();
            session.close();
            connection.close();
    
        } catch (JMSException ex) {
            System.out.println("Error = " + ex.getMessage());
        }
    }
    
    public static void receivemessage(final String correlationId) {
        try {
    
            QueueConnectionFactory cf = new QueueConnectionFactory();
            QueueConnection connection = cf.createQueueConnection();
            QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
            BasicQueue destination = (BasicQueue) session.createQueue(getQueueName());
    
            connection.start();
    
            System.out.println("\n");
            System.out.println("Start listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
            long now = System.currentTimeMillis();
    
            // receive our message
            String filter = "JMSCorrelationID = '" + correlationId  + "'";
            QueueReceiver receiver = session.createReceiver(destination, filter);
            TextMessage m = (TextMessage) receiver.receive();
            System.out.println("Received message = " + m.getText() + " timestamp=" + m.getJMSTimestamp());
    
            System.out.println("End listen " + getQueueName() + " " + correlationId +" Queue from receivemessage");
    
            session.close();
            connection.close();
    
        } catch (JMSException ex) {
            System.out.println("Error = " + ex.getMessage());
        }
    }
    
    public static void main(String args[]) {
        HelloProducerConsumer.sendMessage(getQueueName());
        String correlationId1 = getCorrelationId();
        HelloProducerConsumer.sendMessage(getQueueName());
        String correlationId2 = getCorrelationId();
        HelloProducerConsumer.sendMessage(getQueueName());
        String correlationId3 = getCorrelationId();
    
    
        HelloProducerConsumer.receivemessage(correlationId2);
    
        HelloProducerConsumer.receivemessage(correlationId1);
    
        HelloProducerConsumer.receivemessage(correlationId3);
    }
    }
    

    【讨论】:

      【解决方案2】:
      String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
      QueueReceiver receiver = session.createReceiver(queue, filter);
      

      这里接收者将收到JMSCorrelationID 等于MessageID 的消息。这在请求/响应范例中非常有帮助。

      或者您可以直接将其设置为任何值:

      QueueReceiver receiver = session.createReceiver(queue,  "JMSCorrelationID ='"+id+"'";);
      

      你可以做到receiver.receive(2000);receiver.setMessageListener(this);

      【讨论】:

        【解决方案3】:

        顺便说一句,虽然这不是您提出的实际问题 - 如果您尝试通过 JMS 实现请求响应,我建议您使用 reading this article,因为 JMS API 比您想象的要复杂得多,并且有效地执行此操作要多得多比看起来更难。

        尤其是to use JMS efficiently,您应该尽量避免为单个消息等创建消费者。

        另外,由于 JMS API 非常复杂,无法正确有效地使用 - 特别是在池、事务和并发处理方面 - 我推荐人们 hide the middleware from their application code,例如通过使用 Apache Camel's Spring Remoting implementation for JMS

        【讨论】:

        • 如果我几年前知道 Camel,我会为自己节省很多车轮改造。
        【解决方案4】:

        队列连接设置是相同的,但是一旦有了 QueueSession,就可以在创建接收器时设置选择器。

            QueueReceiver receiver = session.createReceiver(myQueue, "JMSCorrelationID='theid'");
        

        然后

        receiver.receive()
        

        receiver.setListener(myListener);
        

        【讨论】:

        • 我最近一直在阅读同一个主题,并且有一个问题如下:接收者是否仍会收到那些不包含所需关联 ID 的消息并在不处理的情况下静默丢弃它们,或者JMS 提供者本身是否不会将此类消息传递给接收者,以便它们仍然保留在队列中?我觉得后者是正确的做法,但想验证一下。谢谢。
        • @Robin ... 在这种情况下,过滤条件是 "JMSCorrelationID='theid'" 。我需要再添加一个条件,例如“JMSCorrelationID='theid'”和“Location=ASIA”。这个多重过滤条件的语法是什么
        猜你喜欢
        • 2015-04-03
        • 1970-01-01
        • 2012-01-19
        • 1970-01-01
        • 2011-12-12
        • 2023-03-19
        • 2012-08-12
        • 2014-12-30
        • 1970-01-01
        相关资源
        最近更新 更多