【问题标题】:consume all messages from IBM MQ使用来自 IBM MQ 的所有消息
【发布时间】:2021-07-23 04:12:54
【问题描述】:

我想消费来自 MQ 的所有消息。

public static void main(String[] args)
{ 
 JMSContext context = null;
 Destination destination = null;
 JMSConsumer consumer = null;

 JmsFactoryFactory FF = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
 JmsConnectionFactor CF = FF.createConnectionFactory();
  context = CF.createContext();
  destination = context.createQueue(QUEUE_NAME);
  consumer = context.createConsumer(destination);
  String msg = consumer.receiveBody(String.class, 15090);
  System.out.println(msg);
 }

它只能阅读一条消息。如何使用所有消息?另外,有没有更简单的方法可以删除队列中的所有消息,甚至不读取或使用它们?

【问题讨论】:

    标签: java jms ibm-mq mq


    【解决方案1】:

    有没有更简单的方法来删除队列中的所有消息,甚至不 阅读或消费它们?

    是的。您可以使用 MQ PCF Clear Queue 命令,只要没有应用程序为输入或输出打开队列。即 IPPROCS 和 OPPROCS 必须为零才能正常工作。这也适用于 Morag 的 MQSC Clear 命令。

    这是一个用于清除队列的功能齐全的 Java MQ PCF 程序。

    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Hashtable;
    
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.constants.CMQC;
    import com.ibm.mq.constants.CMQCFC;
    import com.ibm.mq.headers.MQDataException;
    import com.ibm.mq.headers.pcf.PCFMessage;
    import com.ibm.mq.headers.pcf.PCFMessageAgent;
    
    /**
     * Program Name
     *  MQClearQueue01
     *
     * Description
     *  This java class issues a PCF "Clear Q" command for a queue to delete all messages 
     *  in the queue of a remote queue manager.
     *
     * Sample Command Line Parameters
     *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
     *
     * @author Roger Lacroix
     */
    public class MQClearQueue01
    {
       private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
    
       private Hashtable<String,String> params;
       private Hashtable<String,Object> mqht;
    
       public MQClearQueue01()
       {
          super();
          params = new Hashtable<String,String>();
          mqht = new Hashtable<String,Object>();
       }
    
       /**
        * Make sure the required parameters are present.
        * @return true/false
        */
       private boolean allParamsPresent()
       {
          boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                      params.containsKey("-c") && params.containsKey("-m") &&
                      params.containsKey("-q") &&
                      params.containsKey("-u") && params.containsKey("-x");
          if (b)
          {
             try
             {
                Integer.parseInt((String) params.get("-p"));
             }
             catch (NumberFormatException e)
             {
                b = false;
             }
          }
    
          return b;
       }
    
       /**
        * Extract the command-line parameters and initialize the MQ HashTable.
        * @param args
        * @throws IllegalArgumentException
        */
       private void init(String[] args) throws IllegalArgumentException
       {
          int port = 1414;
          if (args.length > 0 && (args.length % 2) == 0)
          {
             for (int i = 0; i < args.length; i += 2)
             {
                params.put(args[i], args[i + 1]);
             }
          }
          else
          {
             throw new IllegalArgumentException();
          }
    
          if (allParamsPresent())
          {
             try
             {
                port = Integer.parseInt((String) params.get("-p"));
             }
             catch (NumberFormatException e)
             {
                port = 1414;
             }
    
             mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
             mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
             mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
             mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
             mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
    
             // I don't want to see MQ exceptions at the console.
             MQException.log = null;
          }
          else
          {
             throw new IllegalArgumentException();
          }
       }
    
       /**
        * Handle connecting to the queue manager, issuing PCF command then
        * looping through PCF response messages and disconnecting from
        * the queue manager.
        */
       private void doPCF()
       {
          MQQueueManager  qMgr   = null;
          PCFMessageAgent agent  = null;
          PCFMessage   request   = null;
          PCFMessage[] responses = null;
          String qMgrName  = (String) params.get("-m");
          String queueName = (String) params.get("-q");
    
          try
          {
             qMgr = new MQQueueManager(qMgrName, mqht);
             MQClearQueue01.logger("successfully connected to "+ qMgrName);
    
             agent = new PCFMessageAgent(qMgr);
             MQClearQueue01.logger("successfully created agent");
    
             // https://www.ibm.com/support/knowledgecenter/SSFKSJ_latest/com.ibm.mq.ref.adm.doc/q087420_.html
             request = new PCFMessage(CMQCFC.MQCMD_CLEAR_Q);
    
             request.addParameter(CMQC.MQCA_Q_NAME, queueName);
    
             responses = agent.send(request);
    
             MQClearQueue01.logger("responses.length="+responses.length);
    
             for (int i = 0; i < responses.length; i++)
             {
                if ((responses[i]).getCompCode() == CMQC.MQCC_OK)
                   MQClearQueue01.logger("Successfully cleared queue '"+queueName+"' of messages.");
                else
                   MQClearQueue01.logger("Error: Failed to clear queue '"+queueName+"' of messages.");
             }
          }
          catch (MQException e)
          {
             MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
          }
          catch (IOException e)
          {
             MQClearQueue01.logger("IOException:" +e.getLocalizedMessage());
          }
          catch (MQDataException e)
          {
             MQClearQueue01.logger("MQDataException:" +e.getLocalizedMessage());
          }
          finally
          {
             try
             {
                if (agent != null)
                {
                   agent.disconnect();
                   MQClearQueue01.logger("disconnected from agent");
                }
             }
             catch (MQDataException e)
             {
                MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
             }
    
             try
             {
                if (qMgr != null)
                {
                   qMgr.disconnect();
                   MQClearQueue01.logger("disconnected from "+ qMgrName);
                }
             }
             catch (MQException e)
             {
                MQClearQueue01.logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
             }
          }
       }
    
       /**
        * A simple logger method
        * @param data
        */
       public static void logger(String data)
       {
          String className = Thread.currentThread().getStackTrace()[2].getClassName();
    
          // Remove the package info.
          if ( (className != null) && (className.lastIndexOf('.') != -1) )
             className = className.substring(className.lastIndexOf('.')+1);
    
          System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
       }
    
       public static void main(String[] args)
       {
          MQClearQueue01 mqcq = new MQClearQueue01();
    
          try
          {
             mqcq.init(args);
             mqcq.doPCF();
          }
          catch (IllegalArgumentException e)
          {
             MQClearQueue01.logger("Usage: java MQClearQueue01 -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
             System.exit(1);
          }
    
          System.exit(0);
       }
    }
    

    【讨论】:

      【解决方案2】:

      如果正如您的问题所暗示的那样,您只想清空队列中的所有消息,而不是在应用程序中实际读取它们,您可以考虑简单地使用管理 MQSC 命令:-

      CLEAR QLOCAL(queue-name)
      

      您可以将其键入runmqsc 工具以将其发送给队列管理器。

      【讨论】:

        【解决方案3】:

        JMS API 一次使用一条消息,因此您需要将 receiveBody 放入一个循环中,例如:

        public static void main(String[] args) { 
           JMSContext context = null;
           Destination destination = null;
           JMSConsumer consumer = null;
        
           JmsFactoryFactory FF = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
           JmsConnectionFactor CF = FF.createConnectionFactory();
           context = CF.createContext();
           destination = context.createQueue(QUEUE_NAME);
           consumer = context.createConsumer(destination);
           String msg = null;
           do {
              msg = consumer.receiveBody(String.class, 15090);
              System.out.println(msg);
           } while (msg != null);
        }
        

        receiveBody 返回null 时,表示队列中没有更多消息。

        JMS API 没有定义从队列中删除所有消息的任何方法,但大多数 JMS 服务器都有一个特定于实现的管理 API,您可以通过它执行这些操作。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2016-02-18
          • 1970-01-01
          • 2021-08-02
          • 1970-01-01
          • 1970-01-01
          • 2020-05-06
          • 1970-01-01
          相关资源
          最近更新 更多