【问题标题】:Purge IBM MQ Queue programmatically以编程方式清除 IBM MQ 队列
【发布时间】:2020-04-23 15:16:07
【问题描述】:

有没有办法以编程方式清除 IBM MQ Queue?队列中的消息很少,但是当我使用 Consumer code 读取消息时,消息仍然存在于队列中。我假设队列中存在一些未提交的消息。我无权访问 MQ 资源管理器,因此我想以编程方式清除队列。 (通过JMS代码或IBM MQ实现方式)

目前我的消费者有 jar 文件 com.ibm.mq-6.0.2.1.jar 所以我更喜欢使用 WMQ 类而不是 JMS。

【问题讨论】:

  • 你的代码是什么样的?编辑并将其添加到问题中。如果您尝试将其放在评论中,这将无济于事。
  • 我知道我使用的是旧版本的客户端 jar。假设我迁移到新的 jar,它是所有客户端的 jar。现在我如何以编程方式实现它。我使用的当前 jar 版本也没有以编程方式清除队列的功能?
  • 您可以从队列中获取消息,也可以使用 PCF 接口。了解你在做什么会让别人知道你做错了什么。
  • 我正在尝试以编程方式清除 MQ 队列。忘记我正在使用的客户端版本 jar。假设我使用所有客户端最新的 jar .. 现在如何以编程方式清除队列。 ?
  • 如果你不能展示你的代码,至少给我们一个关于它做什么的线索。 @JoshMc 提供了两种可能的方法。您告诉我们您的代码无法正常工作,但我们不知道您的代码做了什么以便进一步提供建议。也许至少显示一些描述您的应用程序所做的伪代码是一种解决感知代码发布问题的方法?

标签: java jms ibm-mq


【解决方案1】:

这是一个功能齐全的 Java/MQ 程序,名为“EmptyQ.java”,它将删除队列中的所有消息,直到队列为空。注意:这是我在here 发布的示例 MQ/Java 程序之一。

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;

import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;

/**
 * Program Name: 
 *  EmptyQ
 *
 * Description: 
 * This java class will connect to a remote queue manager with the
 * MQ setting stored in a HashTable, loop to retrieve (delete) all messages from
 * a queue then close and disconnect.
 *
 * Sample Command Line Parameters: 
 * bindings mode: 
 *  -m MQA1 -q TEST.Q1 
 *  
 * client mode:
 *  -m MQA1 -q TEST.Q1 -h 127.0.0.1 -p 1414 -c TEST.CHL -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class EmptyQ
{
   private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");

   private Hashtable<String, String> params;
   private Hashtable<String, Object> mqht;

   /**
    * The constructor
    */
   public EmptyQ()
   {
      super();
      params = new Hashtable<String, String>();
      mqht = new Hashtable<String, Object>();
   }

   /**
    * Make sure the required parameters are present.
    * 
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-m") && params.containsKey("-q");

      if (params.containsKey("-c"))
      {
         b = b && params.containsKey("-c") && params.containsKey("-h") && params.containsKey("-p");
      }

      if (b)
      {
         try
         {
            if (params.containsKey("-p"))
               Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ HashTable.
    * 
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      int port = 1414;
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         if (params.containsKey("-c"))
         {
            try
            {
               port = Integer.parseInt((String) params.get("-p"));
            }
            catch (NumberFormatException e)
            {
               port = 1414;
            }

            mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
            mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
            mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
            if (params.containsKey("-u"))
               mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
            if (params.containsKey("-x"))
               mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));
         }

         // I don't want to see MQ exceptions at the console.
         MQException.log = null;
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Connect, open queue, loop and get all messages then close queue and
    * disconnect.
    *
    */
   private void receive()
   {
      String qMgrName = (String) params.get("-m");
      String inputQName = (String) params.get("-q");
      MQQueueManager qMgr = null;
      MQQueue queue = null;
      int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF + CMQC.MQOO_INQUIRE + CMQC.MQOO_FAIL_IF_QUIESCING;
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_ACCEPT_TRUNCATED_MSG;
      MQMessage receiveMsg = null;
      int msgCount = 0;
      boolean getMore = true;

      try
      {
         if (params.containsKey("-c"))
            qMgr = new MQQueueManager(qMgrName, mqht);
         else
            qMgr = new MQQueueManager(qMgrName);
         EmptyQ.logger("successfully connected to " + qMgrName);

         queue = qMgr.accessQueue(inputQName, openOptions);
         EmptyQ.logger("successfully opened " + inputQName);

         while (getMore)
         {
            receiveMsg = new MQMessage();

            try
            {
               // get the message on the queue - request only 1 byte - make it go as fast as possible.
               queue.get(receiveMsg, gmo, 1);
               msgCount++;
            }
            catch (MQException e)
            {
               if ( (e.completionCode == CMQC.MQCC_FAILED) && 
                    (e.reasonCode == CMQC.MQRC_NO_MSG_AVAILABLE) )
               {
                  // All messages read.
                  getMore = false;
                  break;
               }
               else if ( (e.completionCode == CMQC.MQCC_WARNING) && 
                         (e.reasonCode == CMQC.MQRC_TRUNCATED_MSG_ACCEPTED) )
               {
                  msgCount++;
               }
               else
               {
                  EmptyQ.logger("MQException: " + e.getLocalizedMessage());
                  EmptyQ.logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
                  getMore = false;
                  break;
               }
            }
         }
      }
      catch (MQException e)
      {
         EmptyQ.logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
      }
      finally
      {
         EmptyQ.logger("deleted " + msgCount + " messages");

         try
         {
            if (queue != null)
            {
               queue.close();
               EmptyQ.logger("closed: " + inputQName);
            }
         }
         catch (MQException e)
         {
            EmptyQ.logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
         }
         try
         {
            if (qMgr != null)
            {
               qMgr.disconnect();
               EmptyQ.logger("disconnected from " + qMgrName);
            }
         }
         catch (MQException e)
         {
            EmptyQ.logger("CC=" + e.completionCode + " : RC=" + e.reasonCode);
         }
      }
   }

   /**
    * A simple logger method
    * 
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ((className != null) && (className.lastIndexOf('.') != -1))
         className = className.substring(className.lastIndexOf('.') + 1);

      System.out.println(LOGGER_TIMESTAMP.format(new Date()) + " " + className + ": " + Thread.currentThread().getStackTrace()[2].getMethodName() + ": " + data);
   }

   /**
    * main line
    * 
    * @param args
    */
   public static void main(String[] args)
   {
      EmptyQ write = new EmptyQ();

      try
      {
         write.init(args);
         write.receive();
      }
      catch (IllegalArgumentException e)
      {
         System.err.println("Usage: java EmptyQ -m QueueManagerName -q QueueName [-h host -p port -c channel] [-u UserID] [-x Password]");
         System.exit(1);
      }

      System.exit(0);
   }
}

【讨论】:

  • 你有最新版本的 MQRead.java 和 MQWrite.java,我在那个网站上看到也是你写的,对最新的 API 方法参考有很大帮助。如果您编写了上述类的新版本,请分享它
  • @rinilnath MQ/Java API 方法没有改变。在 MQ v7(我认为)中,IBM 更改了常量所在的位置。只需更改“MQC”。到“CMQC”。在 MQRead 和 MQWrite 中,然后它应该编译。
  • 感谢 CMQC 删除了弃用警告。但 MQException.MQRC_NO_MSG_AVAILABLE 中还有另一处弃用。你能建议替代方案吗
  • 我强烈建议您开始进行互联网搜索并阅读 MQ 知识中心。您使用 Google 获取MQException.MQRC_NO_MSG_AVAILABLE 的第一个链接是什么?它是 MQ KnowLedge Center (ibm.com/support/knowledgecenter/SSFKSJ_9.2.0/…) 中的一个页面的链接,上面写着 MQException.MQRC_NO_MSG_AVAILABLE 已贬值以及使用什么。即 已弃用。改用 MQConstants.MQRC_NO_MSG_AVAILABLE。 或者你可以使用 CMQC 类。
【解决方案2】:

我最近在我的项目中实现了这个。

使用 WMQ

浏览队列,破坏性地使用队列中的消息。

在 MQMessageoptions 中使用 CMQC.MQGMO_MSG_UNDER_CURSOR 从队列中破坏性地删除消息。

https://www.ibm.com/docs/en/ibm-mq/9.0?topic=java-mqc

MQGetMessageOptions getOptions = new MQGetMessageOptions();
getOptions.options = CMQC.MQGMO_MSG_UNDER_CURSOR + CMQC.MQGMO_NO_WAIT
                                + CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_ACCEPT_TRUNCATED_MSG;

【讨论】:

    【解决方案3】:

    像这样简单的东西怎么样?

    import java.io.IOException;
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQMessage;
    import com.ibm.mq.MQPutMessageOptions;
    import com.ibm.mq.MQQueue;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.constants.MQConstants;
    import com.ibm.msg.client.wmq.WMQConstants;
    
    public class MQClear {
    
      private static final String qManager = "QM1";
      private static final String qName = "Q1";
    
      public static void main(String args[]) {
        try {
    
          MQQueueManager qMgr = new MQQueueManager(qManager);
          int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF;
          MQQueue queue = qMgr.accessQueue(qName, openOptions);
    
          // not great: while (queue.getCurrentDepth()>0) {
          boolean hasMore = true;
          while (hasMore) {
    
            try {
              MQMessage mqMsg = new MQMessage();
              queue.get(mqMsg);
            }
            catch (MQException ex) {
              hasMore = false;
              if( ex.reasonCode!=2033 ) throw ex; // if this was something other than NO_MSG_AVAILABLE, rethrow
            }
          }
    
          queue.close();
          qMgr.disconnect(); 
        }
        catch (MQException ex) {
    
          System.out.println("A WebSphere MQ Error occured : Completion Code " + ex.completionCode
              + " Reason Code " + ex.reasonCode);
          ex.printStackTrace();
        }
      }
    }
    

    【讨论】:

    • 您永远不应该从基于“当前深度”的队列中获取消息。当前深度包括未提交的消息。此外,当前深度值是特定时刻队列深度的快照。在抛出原因码 2033 (MQRC_NO_MSG_AVAILABLE) 的异常之前,循环和获取消息要好得多。
    • 是的,我首先考虑了这种方法(获取消息直到没有更多消息),但不幸的是 MQ API 方法 get() 有点麻烦,因为它会引发期望而不是返回 null (或错误代码)如果没有更多消息。 JMS 在这方面更好。不管怎样,我已经更改了代码。
    猜你喜欢
    • 2021-10-03
    • 1970-01-01
    • 1970-01-01
    • 2011-06-27
    • 2010-11-06
    • 2011-03-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多