【问题标题】:How to enqueue a JMS message into Oracle AQ using Java如何使用 Java 将 JMS 消息排入 Oracle AQ
【发布时间】:2013-12-28 03:25:02
【问题描述】:

我有一个队列类型为 SYS.AQ$_JMS_TEXT_MESSAGE 的 Oracle AQ。我正在尝试做的是从 java 应用程序将文本插入到提到的队列中。

等效的 SQL 查询是

declare
 r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 v_message_handle     RAW(16);
 o_payload            SYS.AQ$_JMS_TEXT_MESSAGE;
begin
 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());
 sys.dbms_aq.enqueue (
   queue_name         => 'QUEUE_NAME',
   enqueue_options    => r_enqueue_options,
   message_properties => r_message_properties,
   payload            => o_payload,
   msgid              => v_message_handle
 );
 commit;
end;
/

我使用 this guide 完成了大部分操作,但我被困在了

 o_payload := sys.aq$_jms_text_message.construct;
 o_payload.set_text(xmltype('<user>text</user>').getClobVal());

该指南展示了如何将 RAW 消息入队,但我需要它是 JMS,否则数据类型与队列类型不匹配。

任何帮助将不胜感激,因为即使使用全能的谷歌我也无法找到解决此问题的方法。有没有办法使用oracle.jdbc.aq 类来做到这一点,还是我只需要吸收它并使用 SQL 查询?

【问题讨论】:

    标签: java jms oracle-aq


    【解决方案1】:

    只需复制粘贴此代码并尝试。 (如果它适合你)然后仔细阅读代码,并理解。

    执行时,

    • 首先取消注释主方法中的“createQueue()”行。

    之后,

    • 评论它并取消评论“sendMessage()”行并尝试发送您的消息。

    然后分别注释/取消注释每一行并尝试一下。

    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.QueueBrowser;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import oracle.AQ.AQQueueTable;
    import oracle.AQ.AQQueueTableProperty;
    import oracle.jms.AQjmsDestination;
    import oracle.jms.AQjmsDestinationProperty;
    import oracle.jms.AQjmsFactory;
    import oracle.jms.AQjmsSession;
    
    public class OracleAQClient {
    
    public static QueueConnection getConnection() {
    
        String hostname = "localhost";
        String oracle_sid = "xe";
        int portno = 1521;
        String userName = "jmsuser";
        String password = "jmsuser";
        String driver = "thin";
        QueueConnectionFactory QFac = null;
        QueueConnection QCon = null;
        try {
            // get connection factory , not going through JNDI here
            QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
            // create connection
            QCon = QFac.createQueueConnection(userName, password);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return QCon;
    }
    
    public static void createQueue(String user, String qTable, String queueName) {
        try {
            /* Create Queue Tables */
            System.out.println("Creating Queue Table...");
            QueueConnection QCon = getConnection();
            Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
    
            AQQueueTableProperty qt_prop;
            AQQueueTable q_table = null;
            AQjmsDestinationProperty dest_prop;
            Queue queue = null;
            qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");
    
            q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);
    
            System.out.println("Qtable created");
            dest_prop = new AQjmsDestinationProperty();
            /* create a queue */
            queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
            System.out.println("Queue created");
            /* start the queue */
            ((AQjmsDestination) queue).start(session, true, true);
    
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
    }
    
    public static void sendMessage(String user, String queueName,String message) {
    
        try {
            QueueConnection QCon = getConnection();
            Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
            QCon.start();
            Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
            MessageProducer producer = session.createProducer(queue);
            TextMessage tMsg = session.createTextMessage(message);
    
            //set properties to msg since axis2 needs this parameters to find the operation
            tMsg.setStringProperty("SOAPAction", "getQuote");
            producer.send(tMsg);
            System.out.println("Sent message = " + tMsg.getText());
    
            session.close();
            producer.close();
            QCon.close();
    
        } catch (JMSException e) {
            e.printStackTrace();
            return;
        }
    }
    
    public static void browseMessage(String user, String queueName) {
        Queue queue;
        try {
            QueueConnection QCon = getConnection();
            Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
    
            QCon.start();
            queue = ((AQjmsSession) session).getQueue(user, queueName);
            QueueBrowser browser = session.createBrowser(queue);
            Enumeration enu = browser.getEnumeration();
            List list = new ArrayList();
            while (enu.hasMoreElements()) {
                TextMessage message = (TextMessage) enu.nextElement();
                list.add(message.getText());
            }
            for (int i = 0; i < list.size(); i++) {
                System.out.println("Browsed msg " + list.get(i));
            }
            browser.close();
            session.close();
            QCon.close();
    
        } catch (JMSException e) {
            e.printStackTrace();
        }
    
    }
    
    public static void consumeMessage(String user, String queueName) {
        Queue queue;
        try {
            QueueConnection QCon = getConnection();
            Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
            QCon.start();
            queue = ((AQjmsSession) session).getQueue(user, queueName);
            MessageConsumer consumer = session.createConsumer(queue);
            TextMessage msg = (TextMessage) consumer.receive();
            System.out.println("MESSAGE RECEIVED " + msg.getText());
    
            consumer.close();
            session.close();
            QCon.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String args[]) {
        String userName = "jmsuser";
        String queue = "sample_aq";
        String qTable = "sample_aqtbl";
        //createQueue(userName, qTable, queue);
        //sendMessage(userName, queue,"<user>text</user>");
        //browseMessage(userName, queue);
        //consumeMessage(userName, queue);
    }
    

    }

    您需要将这些 jars/libs 从您的 oracle DB 安装目录复制到您的 java 项目中

    1. ojdbc6.jar
    2. jta.jar
    3. jmscommon.jar
    4. aqapi.jar

    本文[1] 应归功于 Ratha。需要修改的东西很少,我只是修改并提供了代码。

    [1]http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/

    谢谢

    【讨论】:

      【解决方案2】:

      我会在@Chathura Kulasinghe 的回答中添加一些花絮。

      首先,在consumeMessage方法中,使用

      Session.CLIENT_ACKNOWLEDGE

      用于创建会话对象的参数将具有将您消费的消息留在队列中的效果。如果你多次运行这个程序,你会看到队列的数据库表中消息的数量在增加。要删除消息,您需要通过在消息对象上调用此方法来“确认”它:

      msg.acknowledge();

      其次,如果您希望会话为您执行此操作,只需将客户端确认模式更改为:

      Session.AUTO_ACKNOWLEDGE

      使用此参数,每次调用您的 consumer.receive() 时,它都会自动确认,因此会从队列中删除。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2011-05-02
        • 1970-01-01
        • 2011-05-22
        • 1970-01-01
        • 1970-01-01
        • 2012-03-30
        • 2014-03-31
        • 2020-05-27
        相关资源
        最近更新 更多