【问题标题】:How to remove message from queue by condition?如何按条件从队列中删除消息?
【发布时间】:2019-07-01 09:32:38
【问题描述】:

我从 IBM MQ 收到一条消息。如何按条件从队列中删除消息?

我尝试设置 gmo.options = CMQC.MQGMO_MSG_UNDER_CURSOR;但这对我没有帮助。

MQQueue queue = queueManager.accessQueue(e.getIbmQueue().trim(), CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_FAIL_IF_QUIESCING | CMQC.MQOO_INQUIRE, null, null, null);

MQGetMessageOptions gmo = new MQGetMessageOptions();
                        gmo.options = MQGMO_ALL_MSGS_AVAILABLE | MQGMO_WAIT | MQGMO_PROPERTIES_AS_Q_DEF | MQGMO_FAIL_IF_QUIESCING | MQOO_INPUT_AS_Q_DEF | MQGMO_SYNCPOINT;
gmo.matchOptions = MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 50000;
byte[] body = null;
while (true) {
    try {
        queue.get(msg, gmo);
        body = new byte[msg.getMessageLength()];
        String businessIdFromIbm = msg.getStringProperty("usr.uuid");
        if (businessIdFromIbm.equals("123")) {
            //delete message
        }
        msg.clearMessage();
}

【问题讨论】:

    标签: java ibm-mq


    【解决方案1】:

    上述示例中的 queue.get 调用是破坏性获取 - 在该调用期间,消息将从逻辑上从队列中删除。

    您正在使用同步点,因此您需要调用 queuemanager.commit() 来完成工作单元(或者如果您调用 queuemanager.disconnect(),同步点将自动提交)。

    MQGMO_MSG_UNDER_CURSOR 仅在您已打开浏览队列 (CMQC.MQOO_BROWSE) 并针对该队列发出至少一次浏览以使您的应用程序在其浏览光标下具有有效消息时才有效。

    如果您想浏览消息,然后使用第二个 queue.get 删除它以删除浏览光标下的消息,您的第一个 queue.get 将需要在 gmo.options 上指定 MQGMO_BROWSE_FIRST 或 MQGMO_BROWSE_NEXT;然后第二个 queue.get 需要提供 MQGMO_MSG_UNDER_CURSOR 作为匹配选项没有任一浏览选项集,以便破坏性地删除浏览的消息。

    MQQueue queue = queueManager.accessQueue(e.getIbmQueue().trim(), CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_BROWSE | CMQC.MQOO_FAIL_IF_QUIESCING | CMQC.MQOO_INQUIRE, null, null, null);
    
    MQGetMessageOptions gmo = new MQGetMessageOptions();
    gmo.options = CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_PROPERTIES_AS_Q_DEF | CMQC.MQGMO_FAIL_IF_QUIESCING | CMQC.MQGMO_BROWSE_NEXT;
    gmo.matchOptions = MQMO_MATCH_CORREL_ID;
    gmo.waitInterval = 50000;
    byte[] body = null;
    while (true) {
        try {
            queue.get(msg, gmo);
            body = new byte[msg.getMessageLength()];
            String businessIdFromIbm = msg.getStringProperty("usr.uuid");
            if (businessIdFromIbm.equals("123")) {
                //delete message
                MQGetMessageOptions gmo2 = new MQGetMessageOptions();
                gmo2.options =  CMQC.MQGMO_MSG_UNDER_CURSOR | CMQC.MQGMO_FAIL_IF_QUIESCING | CMQC.MQGMO_SYNCPOINT;
                queue.get(msg, gmo2);
                // Be prepared to handle the case where the message has been removed by another application (or expired) and so you receive MQRC 2033
                queueManager.commit();
            }
            msg.clearMessage();
    }
    

    另一种方法(如果存在问题,则需要较少调用队列管理器)是使用 JMS API 到 MQ,并在 usr.uuid 上使用选择器,这样只有具有将此设置为 123 将返回给应用程序。见https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.1.0/com.ibm.mq.dev.doc/q031980_.htm

    【讨论】:

    • 我不认为最后一段是正确的。执行过滤的不是队列管理器,而是 JMS 提供者(即 JMS 客户端库),因此,JMS 客户端库仍将浏览队列中的消息,以基于特定属性查找匹配的消息。
    • 从 V7 开始,队列管理器执行过滤@Roger
    • 但它不适用于 Java 应用程序。
    • @Roger,它不能从 Java API 的类中使用,因为该 API 不提供将适当的 MQOD 发送到队列管理器的方法,但正如 Morag 从 v7 开始所说JMS 的 MQ 类将在打开队列时将选择器传递给队列管理器,以便在 QM 端完成选择。
    【解决方案2】:

    您可以阅读我的完整博客帖子here。这是它的读者摘要版本。

    对于基于 MQ/JMS 消息的过滤存在误解。一些人认为 IBM MQ 队列管理器为 JMS 应用程序做了一些特殊的事情,而这对普通的 Java 或 C/C++/C#/COBOL 应用程序是没有的。这不是真的。阅读来自知识中心的Message selectors in JMS 的第一段以了解更多信息。注意:对于 C/C++/C#/COBOL 应用程序,它们可以使用 MQOD 结构中的 SelectionString 来执行消息选择。

    我已经为 POJO MQ 应用程序创建了一个消息选择器类。

    这是一个 MQTest12MS 的 sn-p,它向您展示了如何使用 MessageSelector 类。

    ms = new MessageSelector(qMgr);
    ms.openQueue(inputQName);
    ms.setFilter("SomeNum", MessageSelector.Conditions.GREATER_THAN_EQUAL, 123);
    
    while (true)
    {
       receiveMsg = ms.getMessage(startAtBeginning);
    
       // got the message, now go and do something with it.
    
       // set flag to continue rather than restart at the beginning.
       startAtBeginning = false;
    }
    ms.closeQueue();
    

    这里是 MessageSelector 类。

    import java.io.IOException;
    
    import com.ibm.mq.MQException;
    import com.ibm.mq.MQGetMessageOptions;
    import com.ibm.mq.MQMessage;
    import com.ibm.mq.MQQueue;
    import com.ibm.mq.MQQueueManager;
    import com.ibm.mq.constants.CMQC;
    
    /**
     * Class Name
     *  MessageSelector
     *
     * Description
     *  This java class will retrieve messages from a queue based on a filter.
     *
     * @author Roger Lacroix
     * @version 1.0.0
     * @license Apache 2 License
     */
    public class MessageSelector
    {
       public enum Conditions
       {
          EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL;
       }
    
       private MQQueueManager  qMgr = null;
       private MQQueue         inQ = null;
       private String          filterName = null;
       private Conditions      filterCondition;
       private Object          filterValue = null;
    
       /**
        * The constructor
        * @param qMgr - must have a valid/active connection to the queue manager
        */
       public MessageSelector(MQQueueManager qMgr)
       {
          super();
          this.qMgr = qMgr;
       }
    
       /**
        * Open the queue for both browsing and destructive gets.
        * @param qName
        * @throws MQException
        */
       public void openQueue(String qName) throws MQException
       {
          inQ = qMgr.accessQueue(qName, CMQC.MQOO_INQUIRE + CMQC.MQOO_BROWSE + CMQC.MQOO_FAIL_IF_QUIESCING + CMQC.MQOO_INPUT_SHARED);
       }
    
       /**
        * Close the queue.
        * @throws MQException
        */
       public void closeQueue() throws MQException
       {
          if (inQ != null)
             inQ.close();
       }
    
       /**
        * Set the filter name, condition and value.
        * @param name
        * @param condition
        * @param value
        * @throws IllegalArgumentException
        */
       public void setFilter(String name, Conditions condition, Object value) throws IllegalArgumentException
       {
          if (name == null)
             throw new IllegalArgumentException("Filter name cannot be null.");
          else if ("".equals(name))
             throw new IllegalArgumentException("Filter name cannot be blank.");
          else if (value == null)
             throw new IllegalArgumentException("Filter value cannot be null.");
    
          if ( (value instanceof String) || (value instanceof Boolean) ||
               (value instanceof Byte)   || (value instanceof Byte[])  )
           {
              if ( (Conditions.EQUAL != condition) && (Conditions.NOT_EQUAL != condition) )
              {
                 throw new IllegalArgumentException("Filter condition can only be EQUAL or NOT_EQUAL.");
              }
           }
           else if ( (value instanceof Integer) || (value instanceof Long) ||
                     (value instanceof Double)  || (value instanceof Float) )
           {
              if ( (Conditions.EQUAL != condition) && (Conditions.NOT_EQUAL != condition) &&
                   (Conditions.LESS_THAN != condition) && (Conditions.LESS_THAN_EQUAL != condition) &&
                   (Conditions.GREATER_THAN != condition) && (Conditions.GREATER_THAN_EQUAL != condition) )
              {
                 throw new IllegalArgumentException("Filter condition must be one of the following: EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_EQUAL, GREATER_THAN, GREATER_THAN_EQUAL.");
              }
           }
           else
           {
              throw new IllegalArgumentException("Unknown Object type for Filter value.");
           }
    
          /**
           * Pass the checks, save the values
           */
          this.filterName = name;
          this.filterCondition = condition;
          this.filterValue = value;
       }
    
       /**
        * Retrieve the next matching message from the queue.
        * @param reset - Start over from the beginning of the queue.
        * @return
        * @throws MQException
        * @throws IOException
        */
       public MQMessage getMessage(boolean reset) throws MQException, IOException
       {
          MQGetMessageOptions gmo = new MQGetMessageOptions();
          if (reset)
             gmo.options = CMQC.MQGMO_BROWSE_FIRST + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
          else
             gmo.options = CMQC.MQGMO_BROWSE_NEXT + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
          MQMessage getMsg = null;
    
          while (true)
          {
             getMsg = new MQMessage();
    
             inQ.get(getMsg, gmo);
    
             if (performConditionalTest(getMsg))
             {
                deleteMessage();
                break;
             }
    
             gmo.options = CMQC.MQGMO_BROWSE_NEXT + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING;
          }
    
          return getMsg;
       }
    
       /**
        * Handle the conditional testing of the value.
        * @param getMsg
        * @return true/false
        */
       private boolean performConditionalTest(MQMessage getMsg)
       {
          boolean flag = false;
    
          try
          {
             if (filterValue instanceof String)
             {
                String value = getMsg.getStringProperty(filterName);
                if (value != null)
                {
                   if ( (Conditions.EQUAL == filterCondition) && (((String)filterValue).equals(value)) )
                      flag = true;
                   else if ( (Conditions.NOT_EQUAL == filterCondition) && (!(((String)filterValue).equals(value))) )
                      flag = true;
                }
             }
             else if (filterValue instanceof Integer)
             {
                int value = getMsg.getIntProperty(filterName);
    
                if ( (Conditions.EQUAL == filterCondition) && (value == (Integer)filterValue) )
                   flag = true;
                else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Integer)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Integer)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Integer)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Integer)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Integer)filterValue) )
                   flag = true;
             }
             else if (filterValue instanceof Long)
             {
                long value = getMsg.getLongProperty(filterName);
    
                if ( (Conditions.EQUAL == filterCondition) && (value == (Long)filterValue) )
                   flag = true;
                else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Long)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Long)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Long)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Long)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Long)filterValue) )
                   flag = true;
             }
             else if (filterValue instanceof Double)
             {
                double value = getMsg.getDoubleProperty(filterName);
    
                if ( (Conditions.EQUAL == filterCondition) && (value == (Double)filterValue) )
                   flag = true;
                else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Double)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Double)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Double)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Double)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Double)filterValue) )
                   flag = true;
             }
             else if (filterValue instanceof Float)
             {
                float value = getMsg.getFloatProperty(filterName);
    
                if ( (Conditions.EQUAL == filterCondition) && (value == (Float)filterValue) )
                   flag = true;
                else if ( (Conditions.NOT_EQUAL == filterCondition) && (value != (Float)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN == filterCondition) && (value < (Float)filterValue) )
                   flag = true;
                else if ( (Conditions.LESS_THAN_EQUAL == filterCondition) && (value <= (Float)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN == filterCondition) && (value > (Float)filterValue) )
                   flag = true;
                else if ( (Conditions.GREATER_THAN_EQUAL == filterCondition) && (value >= (Float)filterValue) )
                   flag = true;
             }
             else if (filterValue instanceof Boolean)
             {
                Boolean value = getMsg.getBooleanProperty(filterName);
                if ( (value != null) && ((Boolean)filterValue == value) )
                   flag = true;
             }
             else if (filterValue instanceof Byte)
             {
                byte value = getMsg.getByteProperty(filterName);
                if ((Byte)filterValue == value)
                   flag = true;
             }
             else if (filterValue instanceof Byte[])
             {
                byte[] value = getMsg.getBytesProperty(filterName);
                if ( (value != null) && (java.util.Arrays.equals((byte[])filterValue, value)) )
                   flag = true;
             }
          }
          catch (Exception e)
          {}
    
          return flag;
       }
    
       /**
        * Delete the message that the cursor is pointing to.
        */
       private void deleteMessage()
       {
          MQMessage deleteMsg = new MQMessage();
          MQGetMessageOptions gmo = new MQGetMessageOptions();
          gmo.options = CMQC.MQGMO_MSG_UNDER_CURSOR + CMQC.MQGMO_NO_WAIT + CMQC.MQGMO_FAIL_IF_QUIESCING + CMQC.MQGMO_ACCEPT_TRUNCATED_MSG;
    
          /**
           * don't need it - because we already have the message
           * just delete it.
           */
          try
          {
             inQ.get(deleteMsg, gmo, 1);  // only get 1 byte - who cares right!!
          }
          catch (MQException e)
          {}
       }
    }
    

    2019/06/25 更新:我更新了 MessageSelector 类的 setFilter 方法。

    【讨论】:

    • 这似乎说明了如何避免让队列管理器为您进行过滤?队列管理器可以使用名为 SelectionString 的 MQOD 字段过滤消息属性
    • 如果 IBM MQ 团队为 Java 和 C# 应用程序实现了所有可用的 MQ API 调用,那么这不是必需的,但是来自 MQOD 结构的 SelectionString 不适用于 Java 和 C# 应用程序。
    • Java 类是稳定的,因此不会对它们进行任何更改。我不知道为什么 c# 类没有更新——值得 RFE 吗?
    猜你喜欢
    • 2014-06-07
    • 2012-03-20
    • 1970-01-01
    • 2016-09-21
    • 1970-01-01
    • 1970-01-01
    • 2023-03-17
    • 1970-01-01
    • 2015-10-18
    相关资源
    最近更新 更多