【问题标题】:Sending a Response to an ActiveMQ Temporary Queue using only the ReplyTo Name仅使用 ReplyTo 名称向 ActiveMQ 临时队列发送响应
【发布时间】:2011-11-11 19:56:04
【问题描述】:

最近,我一直在尝试让回复模式在 Apache NMS /ActiveMQ 中工作,并且在仅使用临时队列的名称向临时队列发送消息时遇到问题。

该项目是调度服务,它从总线检索请求并将它们发送到另一个进程/运行时(基于复杂的路由标准)来处理请求。然后,这个单独的处理器使用回复队列名称和相关 ID 来制作响应并将其发送给同一代理上的原始请求者,但连接不同。

问题在于,如果您有来自消息的 NMSReplyTo 标头的 Idestination 对象引用,您似乎只能发送到临时队列(或主题)。如果该引用丢失,则无法通过简单地使用其名称将消息发送到临时队列(或主题)。

说明这个问题的是这个简单的“Pong”服务,它侦听消息队列并使用 NMS 回复标头的内容向请求者发出响应。它通过简单地调用 ProcessMessage(string,string) 方法来模拟将请求分派给另一个进程。

    using System;
    using Apache.NMS;

    namespace PongService
    {
        /// <summary>Simple request dispatcher which mimics dispatching requests to other workers in "The Cloud"</summary>
        class PongService
        {
            static ISession session = null;
            static IMessageProducer producer = null;

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("Connecting to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);
                IConnection connection = factory.CreateConnection();
                session = connection.CreateSession();

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Using destination: " + destination);

                producer = session.CreateProducer(null);

                IMessageConsumer consumer = session.CreateConsumer(destination);

                connection.Start();

                consumer.Listener += new MessageListener(OnMessage);

                Console.WriteLine("Press any key to terminate Pong service . . .");

                // loop until a key is pressed
                while (!Console.KeyAvailable)
                {
                    try { System.Threading.Thread.Sleep(50); }
                    catch (Exception ex) { Console.Error.WriteLine(ex.Message + "\r\n" + ex.StackTrace); }
                } // loop

                Console.Write("Closing connection...");
                consumer.Close();
                producer.Close();
                session.Close();
                connection.Close();
                Console.WriteLine("done.");
            }


            /// <summary>Consumer call-back which receives requests and dispatches them to available workers in 'The Cloud'</summary>
            /// <param name="receivedMsg">The message received on the request queue.</param>
            protected static void OnMessage(IMessage receivedMsg)
            {
                // mimic the operation of passing this request to an external processor which can connect 
                // to the broker but will not have references to the session objects including destinations
                Console.WriteLine("Sending request to an external processor");
                ProcessMessage(receivedMsg.NMSReplyTo.ToString(), receivedMsg.NMSCorrelationID.ToString());
            }


            /// <summary>Models a worker in another process/runtime.</summary>
            /// <param name="queuename">Where to send the results of processing</param>
            /// <param name="crid">Correlation identifier of the request.</param>
            protected static void ProcessMessage(string queuename, string crid)
            {
                ITextMessage response = session.CreateTextMessage("Pong!");
                response.NMSCorrelationID = crid;

                IDestination destination = session.GetQueue(queuename);

                Console.WriteLine("Sending response with CRID of '" + crid + "' to " + queuename + "'");
                try
                {
                    producer.Send(destination, response);
                }
                catch (Exception ex)
                {
                    Console.Error.WriteLine("Could not send response: " + ex.Message);
                }

            }

        }

    }

现在为客户。它只是创建一个临时队列,开始监听它,然后在我们的“Pong”服务正在监听的队列上发送一个请求。请求消息中包含临时队列的 IDestination。

    using System;
    using System.Threading;
    using Apache.NMS;
    using Apache.NMS.Util;

    namespace PongClient
    {
        class PongClient
        {
            protected static AutoResetEvent semaphore = new AutoResetEvent(false);
            protected static ITextMessage message = null;
            protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(3);

            public static void Main(string[] args)
            {
                Uri connecturi = new Uri("activemq:tcp://localhost:61616");
                Console.WriteLine("About to connect to " + connecturi);

                IConnectionFactory factory = new NMSConnectionFactory(connecturi);

                IConnection connection = factory.CreateConnection();
                ISession session = connection.CreateSession();

                IDestination temporaryDestination = session.CreateTemporaryQueue();
                Console.WriteLine("Private destination: " + temporaryDestination);

                IDestination destination = session.GetQueue("PONG.CMD");
                Console.WriteLine("Service destination: " + destination);


                IMessageConsumer consumer = session.CreateConsumer(destination);
                consumer.Listener += new MessageListener(OnMessage);

                IMessageProducer producer = session.CreateProducer(destination);

                connection.Start();

                // Send a request message
                ITextMessage request = session.CreateTextMessage("Ping");
                request.NMSCorrelationID = Guid.NewGuid().ToString();
                request.NMSReplyTo = temporaryDestination;
                producer.Send(request);

                // Wait for the message
                semaphore.WaitOne((int)receiveTimeout.TotalMilliseconds, true);
                if (message == null)
                {
                    Console.WriteLine("Timed-Out!");
                }
                else
                {
                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                    Console.WriteLine("Received message with text: " + message.Text);
                }
            }



            protected static void OnMessage(IMessage receivedMsg)
            {
                message = receivedMsg as ITextMessage;
                semaphore.Set();
            }
        }
    }

Pong 进程似乎运行正常,只是它最终创建了一个全新的、独立于回复标题中指定的队列。

以下是所涉及技术的版本:

  • Apache.NMS.ActiveMQ v1.5.1
  • Apache.NMS API v1.5.0
  • ActiveMQ 5.5.0
  • C# .NET 3.5

此问题与描述类似问题的this post 有关。希望这些示例也有助于澄清该请求中的问题。

对于解决方案的任何帮助或见解将不胜感激。

【问题讨论】:

    标签: c# activemq nms


    【解决方案1】:

    您实际上并没有在来自 PongClient 的请求消息中设置回复标头。

    试试这个:

    ITextMessage request = session.CreateTextMessage("Ping");
    request.NMSCorrelationID = Guid.NewGuid().ToString();
    request.NMSReplyTo = temporaryDestination;
    producer.Send(request);
    

    【讨论】:

    • 感谢 Jake,我很遗憾没有发布当前版本的代码。我已经发现并尝试了它但没有成功。事实上,在发布问题时发现了这个遗漏。显然旧版本已被缓存。 - 它对你有用吗?
    【解决方案2】:

    你需要使用IDestination你通过了。

    打电话

    IDestination destination = session.GetQueue(queuename); 
    

    有点邪恶。在幕后,它调用 CreateTemporaryQueue() 用一个新的同名临时队列替换现有的临时队列,而不通知您。

    【讨论】:

    • 我使用的是IBM.XMS 8.0.0.4,ISession接口中没有名为GetQueue的方法
    【解决方案3】:

    我建议使用主题作为回复目标,并让您的消费者根据 NMSCorrelationID 进行过滤。这是我在对临时队列感到非常沮丧之后转向的实现。它实际上有很多优点。

    1. 它减少了服务器上密集的资源使用(无需构造/解构临时队列)。
    2. 它允许您使用另一个消费者来监控发回的响应(您永远无法“窥视”临时队列中的内容)。
    3. 而且它更可靠,因为主题可以通过逻辑名称而不是特定令牌 ID(您在连接中丢失)传递。

    【讨论】:

      猜你喜欢
      • 2011-05-03
      • 1970-01-01
      • 2017-09-13
      • 2018-09-14
      • 2018-07-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-10-23
      相关资源
      最近更新 更多