【问题标题】:IBM MQ Message ListenerIBM MQ 消息侦听器
【发布时间】:2010-12-04 07:34:54
【问题描述】:

您好,有人知道如何使用 IBM MQ 创建消息侦听器吗?我知道如何使用 JMS 规范来做到这一点,但我不确定如何为 IBM MQ 做到这一点。非常感谢任何链接或指针。

【问题讨论】:

  • 如果您知道如何为 JMS 做这件事,为什么还需要使其特定于 MQ?
  • 你能告诉我如何使用 JMS 连接到 IBM MQ,比如我想知道如何指定队列管理器、通道等

标签: java jms ibm-mq


【解决方案1】:

查看 IBM 帮助:Writing WebSphere MQ base Java applications

IBM 有一个用于与队列交互的 API。这是他们的示例:

import com.ibm.mq.*;            // Include the WebSphere MQ classes for Java package


public class MQSample
{
  private String qManager = "your_Q_manager";  // define name of queue
                                               // manager to connect to.
  private MQQueueManager qMgr;                 // define a queue manager
                                               // object
  public static void main(String args[]) {
     new MQSample();
  }

  public MQSample() {
   try {

      // Create a connection to the queue manager

      qMgr = new MQQueueManager(qManager);

      // Set up the options on the queue we wish to open...
      // Note. All WebSphere MQ Options are prefixed with MQC in Java.

      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
                        MQC.MQOO_OUTPUT ;

      // Now specify the queue that we wish to open,
      // and the open options...

      MQQueue system_default_local_queue =
              qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",
                               openOptions);

      // Define a simple WebSphere MQ message, and write some text in UTF format..

      MQMessage hello_world = new MQMessage();
      hello_world.writeUTF("Hello World!");

      // specify the message options...

      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults,
                                                           // same as MQPMO_DEFAULT

      // put the message on the queue

      system_default_local_queue.put(hello_world,pmo);

      // get the message back again...
      // First define a WebSphere MQ message buffer to receive the message into..

      MQMessage retrievedMessage = new MQMessage();
      retrievedMessage.messageId = hello_world.messageId;

      // Set the get message options...

      MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults
                                                           // same as  MQGMO_DEFAULT
      // get the message off the queue...

      system_default_local_queue.get(retrievedMessage, gmo);

      // And prove we have the message by displaying the UTF message text

      String msgText = retrievedMessage.readUTF();
      System.out.println("The message is: " + msgText);
      // Close the queue...
      system_default_local_queue.close();
      // Disconnect from the queue manager

      qMgr.disconnect();
    }
      // If an error has occurred in the above, try to identify what went wrong
      // Was it a WebSphere MQ error?
    catch (MQException ex)
    {
      System.out.println("A WebSphere MQ error occurred : Completion code " +
                         ex.completionCode + " Reason code " + ex.reasonCode);
    }
      // Was it a Java buffer space error?
    catch (java.io.IOException ex)
    {
      System.out.println("An error occurred whilst writing to the message buffer: " + ex);
    }
  }
} // end of sample

我不确定 IBM jar 是否位于基础 Maven 存储库中。我知道过去我必须从本地 IBM 安装中提取它们并将它们放入本地 SVN 存储库中。我正在使用以下罐子:

<dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq.pcf</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqbind</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqjms</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>

【讨论】:

  • 您好,感谢您的回复。我已经检查了这些站点,但我仍然没有找到使用异步消息侦听器的示例。有任何想法吗?谢谢。
  • 我们创建一个线程,每隔 X 秒在位于队列管理器中的队列上查找消息。此外,如果这是您真正需要的内容,您可能需要编辑原始问题。
【解决方案2】:

看看上面提供的示例。

特别是在行

MQGetMessageOptions gmo = new MQGetMessageOptions();       
system_default_local_queue.get(retrievedMessage, gmo);

您可以将 get 配置为在引发 MQRC_NO_MSG_AVAILABLE 异常之前等待指定时间。或者你可以永远等待。

gmo.waitInterval= qTimeout;
gmo.options = MQC.MQGMO_WAIT

因此,您可以创建一个线程来不断寻找新消息,然后将它们传递给处理程序。获取和放置不需要在同一个线程甚至应用程序中。

我希望这有助于回答您的问题。

【讨论】:

    【解决方案3】:

    虽然前面的响应者提到了 WMQ Java API,但 WMQ 也支持 JMS,所以这里有一些资源可以帮助您开始。

    看看这篇文章:IBM WebSphere Developer Technical Journal: Running a standalone Java application on WebSphere MQ V6.0

    另外,如果您已经安装了完整的 WMQ 客户端,而不仅仅是获取 jar,那么您将安装大量示例代码。默认情况下,这些将位于 C:\Program Files\IBM\WebSphere MQ\tools\jms 或 /opt/mqm/samp 中,具体取决于您的平台。

    如果您需要 WMQ 客户端安装媒体,请获取 here。请注意,这是 WMQ v7 客户端,而不是 v6 客户端。它与 v6 QMgr 兼容,但由于 v6 已于 2011 年 9 月结束生命周期,因此您应该在 v7 客户端上进行新的开发,如果可能的话,还应该使用 v7 QMgr。如果双方都是 v7,则有很多功能和性能增强可用。

    如果需要,可以获取产品手册here

    最后,请务必在收到 JMS 异常时打印链接的异常。这不是 WMQ 的事情,而是 JMS 的事情。 Sun 为 JMS 异常提供了多级数据结构,真正有趣的部分通常位于嵌套级别。这没什么大不了的,几行就可以实现:

    try {
      .
      . code that might throw a JMSException
      .
    } catch (JMSException je) {
      System.err.println("caught "+je);
      Exception e = je.getLinkedException();
      if (e != null) {
        System.err.println("linked exception: "+e);
      } else {
        System.err.println("No linked exception found.");
      }
    }
    

    这有助于确定 JMS 错误与传输错误之间的区别。例如,JMS 安全错误可能是 WMQ 2035,或者可能是 JSSE 配置,或者应用程序可能无法访问文件系统中的某些内容。只有其中一个值得花费大量时间来挖掘 WMQ 错误日志,并且只有通过打印链接的异常,您才能判断它是否是那个。

    【讨论】:

      【解决方案4】:

      除了现有答案之外,还有一点很重要:JMS 提供了MessageListener,这是一个允许您以异步回调的形式接收消息的类。

      本机 API 没有等效功能!您必须根据需要重复调​​用 get(...)

      【讨论】:

        【解决方案5】:

        在获取消息之前的循环中,您可以指定如下

        gmo.options = MQC.MQGMO_WAIT
        gmo.waitInterval = MQConstants.MQWI_UNLIMITED;
        

        这使得循环将一直等待,直到队列中有消息。 对我来说,它类似于MessageListerner

        【讨论】:

          【解决方案6】:

          以防万一有人像我一样在 google stackoverflow 上搜索 MQ Listener... 由于 JMS 的实现,这可能不是答案,但这是我一直在寻找的。 像这样的:

          MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
          MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection();
          MQQueueSession session = (MQQueueSession)conn.createSession(false, 1);
          
          Queue queue = session.createQueue("QUEUE");
          
          MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue);
          
          receiver.setMessageListener(new YourListener());
          
          conn.start();
          

          YourListener 应该实现 MessageListener 接口,您将通过 onMessage(Message msg) 方法接收消息。

          【讨论】:

            【解决方案7】:

            您好,这是使用 IBM MQ 的消息侦听器的工作示例。这里我也用spring来创建bean等...

            package queue.app;
            
            import javax.annotation.PostConstruct;
            import javax.jms.Message;
            import javax.jms.MessageListener;
            import javax.jms.Queue;
            import javax.jms.QueueConnection;
            import javax.jms.QueueReceiver;
            import javax.jms.QueueSession;
            import javax.jms.Session;
            import javax.jms.TextMessage;
            
            import org.apache.log4j.Logger;
            import org.springframework.beans.factory.annotation.Value;
            import org.springframework.stereotype.Component;
            
            import com.ibm.mq.jms.MQQueue;
            import com.ibm.mq.jms.MQQueueConnectionFactory;
            import com.ibm.msg.client.wmq.WMQConstants;
            
            
            @Component
            public class QueueConsumer implements MessageListener{
            
                private Logger logger = Logger.getLogger(getClass());
            
                MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
                QueueConnection qc;
                Queue queue;
                QueueSession queueSession;
                QueueReceiver qr;
            
                @Value("${jms.hostName}")
                String jmsHost;
                @Value("${jms.port}")
                String jmsPort;
                @Value("${jms.queue.name}")
                String QUEUE_NAME;
                @Value("${jms.queueManager}")
                String jmsQueueMgr;
                @Value("${jms.username}")
                String jmsUserName;
                @Value("${jms.channel}")
                String jmsChannel;
            
                @PostConstruct
                public void init() throws Exception{
                    qcf.setHostName (jmsHost);
                    qcf.setPort (Integer.parseInt(jmsPort));
                    qcf.setQueueManager (jmsQueueMgr);
                    qcf.setChannel (jmsChannel);
                    qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
                    qc = qcf.createQueueConnection ();
            
                    queue = new MQQueue(QUEUE_NAME);
                    qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
                    queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
                    qr = queueSession.createReceiver(queue);
                    qr.setMessageListener(this);
                    qc.start();
            
                }
            
            
                @Override
                public void onMessage(Message message) {
                    logger.info("Inside On Message...");
                    long t1 = System.currentTimeMillis();
                    logger.info("Message consumed at ...."+t1);
            
                    try{
                        if(message instanceof TextMessage) {
                            logger.info("String message recieved >> "+((TextMessage) message).getText());
                        }
            
                    }catch(Exception e){
                        e.printStackTrace();
                    }
            
                }
            }
            

            以下是我的依赖项..

            <dependency>
                        <groupId>com.sun.messaging.mq</groupId>
                        <artifactId>fscontext</artifactId>
                        <version>4.2</version>
                        <scope>test</scope>
                    </dependency>
            
                    <dependency>
                        <groupId>com.ibm</groupId>
                        <artifactId>jms</artifactId>
                        <version>1.0</version>
                    </dependency>
            
                    <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-jms</artifactId>
                        <version>3.2.17.RELEASE</version>
                    </dependency>
            
            
                    <dependency>
                        <groupId>com.ibm</groupId>
                        <artifactId>com.ibm.mq</artifactId>
                        <version>1.0</version>
                    </dependency>
                    <dependency>
                        <groupId>com.ibm</groupId>
                        <artifactId>com.ibm.mq.allclient</artifactId>
                        <version>1.0</version>
                    </dependency>
                    <dependency>
                        <groupId>com.ibm</groupId>
                        <artifactId>com.ibm.mq.jmqi</artifactId>
                        <version>1.0</version>
                    </dependency>
                    <dependency>
                        <groupId>com.ibm</groupId>
                        <artifactId>com.ibm.mqjms</artifactId>
                        <version>1.0</version>
                    </dependency>
            

            【讨论】:

            • 什么是 jmsUserName;.. ??是强制的吗??我只有主机、端口、队列管理器名称、队列名称和通道......就是这样。
            • 你是如何决定和总结依赖关系的。你怎么知道要使用哪些版本? maven中没有1.0版本的allclient jar ..你怎么解决的?
            • 请忽略 jmsUserName 属性,这是用于其他目的。对于依赖项,您需要自己检查兼容版本,这取决于您使用的所有库。对于 allclient jar,您可以参考 mvnrepository.com/artifact/com.ibm.mq/com.ibm.mq.allclient/… 更改组 ID。
            • maven repo 没有您指定的依赖项的版本...对于您指定的每个依赖项 1.0.这是正确的吗?。因为我的 Maven 构建失败了。
            • 我的工件中有自己的重新编译的库,标记为 1.0。您可以从 maven Central 中选择任何可用的工作版本。
            猜你喜欢
            • 2011-01-20
            • 2021-09-17
            • 1970-01-01
            • 2018-01-19
            • 1970-01-01
            • 1970-01-01
            • 2010-12-04
            • 2015-07-20
            • 1970-01-01
            相关资源
            最近更新 更多