【问题标题】:Subscribing to a IBM MQ Topic from a .net client从 .net 客户端订阅 IBM MQ 主题
【发布时间】:2019-07-04 07:56:00
【问题描述】:

如何在订阅者模型中从 .net 客户端订阅 IBM MQ 主题

我已经有一个能够从主题中获取消息的代码的工作副本。

        string qmName = "Q1";
        string hostName = "MyHost";
        string strPort = "1114";
        string channelName = "MyCh";
        string transport = TRP;

        Hashtable connectionProperties = new Hashtable();
        connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
        connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
        connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);

        MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);

        string topicString = "TTTT";
        string subscriptionName = "SSS";
        int openOptionsForGet = MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
        MQTopic destForGet = mqQueueManager.AccessTopic(null, null, null, openOptionsForGet, null, subscriptionName);

        MQMessage messageForGet = new MQMessage();
        MQGetMessageOptions gmo = new MQGetMessageOptions();
        gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
        gmo.WaitInterval = 3000;  // wait 60 seconds
        destForGet.Get(messageForGet, gmo);
        string msg = messageForGet.ReadLine();
        System.Console.WriteLine("Received message data : " + msg);

        MessageBox.Show("Received message data : " + msg);

        destForGet.Close();
        mqQueueManager.Disconnect();
        mqQueueManager.Close();

所以上面的代码总是打开和关闭连接但是我想建立一个连接和订阅以在循环/睡眠时间间隔内调用,所以每次没有建立和关闭连接。所以要求是:

  1. 连接一次,然后每1分钟订阅一次主题,读取数据并进行处理。

  2. 截至目前,每当读取消息时,它都会从主题中删除。我想知道如何提交/确认以删除成功处理后的队列消息。

我不想从队列中删除味精。

【问题讨论】:

  • 如果您希望能够在某个工作单元下获取已发布的消息,请将MQc.MQGMO_SYNCPOINT 添加到您的gmo.Options。然后您需要调用mqQueueManager.Commit() 提交消息或调用mqQueueManager.Backout() 将其退出,如果程序崩溃它也会被退出。
  • 感谢@JoshMc!我需要将它保持在线程/循环上,以便它每 1 分钟收到一条消息,而无需打开和关闭连接。我不想为每个阅读的味精做下面的事情。 destForGet.Close(); mqQueueManager.Disconnect(); mqQueueManager.Close();
  • 为什么你不会在一个循环中暂停 1 分钟,在你执行 ReadLine 之后,你可以发出 messageForGet.ClearMessage() 并在 60 秒后执行另一个 destForGet.Get,如果这是你想做的。

标签: .net ibm-mq publish-subscribe


【解决方案1】:

连接一次,然后主题订阅每 1 次 分钟读取数据和处理。

只需使用 while 循环(这不是 MQ 问题)。另外,你为什么要等1分钟?使用 Pub/Sub 甚至点对点消息传递,可以在您的应用程序休眠时构建消息。

到目前为止,每当阅读消息时,它都会从主题中删除。一世 想知道如何提交/确认以删除队列消息发布 处理成功。

您可以使用同步点,但您需要提交或回退消息。

我不想从队列中删除味精。

您为什么使用 Pub/Sub?使用点对点消息传递不是更好吗?即浏览而不是获取。

MQC.MQSO_DURABLE | MQC.MQSO_RESUME;

我强烈建议您阅读MQ Knowledge Center 并了解这些选项的作用。因为耐用和非耐用之间存在天壤之别。

这是一个在托管模式下运行的功能齐全的 C# .NET MQ 程序,用于从主题字符串中获取消息:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using IBM.WMQ;

/// <summary> Program Name
/// MQTest82
///
/// Description
/// This C# class will connect to a remote queue manager
/// and get messages from a topic using a managed .NET environment.
///
/// Sample Command Line Parameters
/// -h 127.0.0.1 -p 1415 -c TEST.CHL -m MQWT1 -t ABC/XYZ -u tester -x mypwd
/// </summary>
/// <author>  Roger Lacroix
/// </author>
namespace MQTest82
{
   public class MQTest82
   {
      private Hashtable inParms = null;
      private Hashtable qMgrProp = null;
      private System.String qManager;
      private System.String topicString;

      /*
      * The constructor
      */
      public MQTest82()
          : base()
      {
      }

      /// <summary> Make sure the required parameters are present.</summary>
      /// <returns> true/false
      /// </returns>
      private bool allParamsPresent()
      {
         bool b = inParms.ContainsKey("-h") && inParms.ContainsKey("-p") &&
                  inParms.ContainsKey("-c") && inParms.ContainsKey("-m") &&
                  inParms.ContainsKey("-t");
         if (b)
         {
            try
            {
               System.Int32.Parse((System.String)inParms["-p"]);
            }
            catch (System.FormatException e)
            {
               b = false;
            }
         }

         return b;
      }

      /// <summary> Extract the command-line parameters and initialize the MQ variables.</summary>
      /// <param name="args">
      /// </param>
      /// <throws>  IllegalArgumentException </throws>
      private void init(System.String[] args)
      {
         inParms = System.Collections.Hashtable.Synchronized(new System.Collections.Hashtable(14));
         if (args.Length > 0 && (args.Length % 2) == 0)
         {
            for (int i = 0; i < args.Length; i += 2)
            {
               inParms[args[i]] = args[i + 1];
            }
         }
         else
         {
            throw new System.ArgumentException();
         }

         if (allParamsPresent())
         {
            qManager = ((System.String)inParms["-m"]);
            topicString = ((System.String)inParms["-t"]);

            qMgrProp = new Hashtable();
            qMgrProp.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);

            qMgrProp.Add(MQC.HOST_NAME_PROPERTY, ((System.String)inParms["-h"]));
            qMgrProp.Add(MQC.CHANNEL_PROPERTY, ((System.String)inParms["-c"]));

            try
            {
               qMgrProp.Add(MQC.PORT_PROPERTY, System.Int32.Parse((System.String)inParms["-p"]));
            }
            catch (System.FormatException e)
            {
               qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
            }

            if (inParms.ContainsKey("-u"))
               qMgrProp.Add(MQC.USER_ID_PROPERTY, ((System.String)inParms["-u"]));

            if (inParms.ContainsKey("-x"))
               qMgrProp.Add(MQC.PASSWORD_PROPERTY, ((System.String)inParms["-x"]));

            logger("Parameters:");
            logger("  QMgrName ='" + qManager + "'");
            logger("  Topic String ='" + topicString + "'");

            logger("QMgr Property values:");
            foreach (DictionaryEntry de in qMgrProp)
            {
               logger("  " + de.Key + " = '" + de.Value + "'");
            }
         }
         else
         {
            throw new System.ArgumentException();
         }
      }

      /// <summary> Connect, open topic, get messages, close topic and disconnect. </summary>
      ///
      private void testReceive()
      {
         MQQueueManager qMgr = null;
         MQTopic inTopic = null;
         int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_MANAGED | MQC.MQSO_NON_DURABLE;

         try
         {
            qMgr = new MQQueueManager(qManager, qMgrProp);
            logger("successfully connected to " + qManager);

            inTopic = qMgr.AccessTopic(topicString, null, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);
            logger("successfully opened " + topicString);

            testLoop(inTopic);
         }
         catch (MQException mqex)
         {
            logger("CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
         }
         catch (System.IO.IOException ioex)
         {
            logger("ioex=" + ioex);
         }
         finally
         {
            try
            {
               if (inTopic != null)
                  inTopic.Close();
               logger("closed: " + topicString);
            }
            catch (MQException mqex)
            {
               logger("CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
            }

            try
            {
               if (qMgr != null)
                  qMgr.Disconnect();
               logger("disconnected from " + qManager);
            }
            catch (MQException mqex)
            {
               logger("CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
            }
         }
      }

      private void testLoop(MQTopic inTopic)
      {
         bool flag = true;
         MQGetMessageOptions gmo = new MQGetMessageOptions();
         gmo.Options |= MQC.MQGMO_NO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
         MQMessage msg = null;

         while (flag)
         {
            try
            {
               msg = new MQMessage();
               inTopic.Get(msg, gmo);
               if (msg.Feedback == MQC.MQFB_QUIT)
               {
                  flag = false;
                  logger("received quit message - exiting loop");
               }
               else
                  logger("Message Data: " + msg.ReadString(msg.MessageLength));
            }
            catch (MQException mqex)
            {
               logger("CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
               if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
               {
                  // no meesage - life is good - loop again
                  logger("sleeping");
                  Thread.Sleep(60*1000);  // sleep for 60 seconds
               }
               else
               {
                  flag = false;  // severe error - time to exit
               }
            }
            catch (System.IO.IOException ioex)
            {
               logger("ioex=" + ioex);
            }
         }
      }

      private void logger(String data)
      {
         DateTime myDateTime = DateTime.Now;
         System.Console.Out.WriteLine(myDateTime.ToString("yyyy/MM/dd HH:mm:ss.fff") + " " + this.GetType().Name + ": " + data);
      }

      /// <summary> main line</summary>
      /// <param name="args">
      /// </param>
      //        [STAThread]
      public static void Main(System.String[] args)
      {
         MQTest82 write = new MQTest82();

         try
         {
            write.init(args);
            write.testReceive();
         }
         catch (System.ArgumentException e)
         {
            System.Console.Out.WriteLine("Usage: MQTest82 -h host -p port -c channel -m QueueManagerName -t topicString [-u userID] [-x passwd]");
            System.Environment.Exit(1);
         }
         catch (MQException e)
         {
            System.Console.Out.WriteLine(e);
            System.Environment.Exit(1);
         }

         System.Environment.Exit(0);
      }
   }
}

【讨论】:

    猜你喜欢
    • 2016-12-14
    • 2017-01-11
    • 2019-12-21
    • 2020-04-26
    • 1970-01-01
    • 1970-01-01
    • 2018-09-03
    • 2011-02-09
    • 1970-01-01
    相关资源
    最近更新 更多