【问题标题】:Why is consumer not created for ActiveMQ Temp Queue?为什么没有为 ActiveMQ 临时队列创建消费者?
【发布时间】:2012-05-10 19:58:32
【问题描述】:

除了SimpleMessageListenerContainer 选项,不会为临时队列创建消费者。 对于here.面临的一些问题,我不会使用SimpleMessageListenerContainer

以下代码不起作用...(甚至没有创建临时队列)

                        using (IConnection connection = connectionFactory.CreateConnection())
                    using (ISession session = connection.CreateSession())
                    {
                        IDestination destination = SessionUtil.GetDestination(session, aQueueName);
                        var replyDestination = session.CreateTemporaryQueue();

                        // Create a consumer and producer
                        using (IMessageProducer producer = session.CreateProducer(destination))
                        {
                            // Start the connection so that messages will be processed.
                            connection.Start();

                            IBytesMessage request = session.CreateBytesMessage(aMsg);
                            request.NMSReplyTo = replyDestination;

                            IMessageConsumer consumer = session.CreateConsumer(replyDestination);
                            consumer.Listener += new MessageListener(this.OnAckRecieved);

                            // Send a message
                            producer.Send(request);
                            ack = this.autoEvent.WaitOne(this.msgConsumeTimeOut, true);

                            consumer.Close();
                            consumer.Dispose();
                            ConnectionFactoryUtils.GetTargetSession(session).DeleteDestination(replyDestination);
                        }
                        connection.Close();
                        session.Close();

以下代码正在运行:-但队列似乎是持久队列而不是临时队列

                        using (IConnection connection = connectionFactory.CreateConnection())
                    using (ISession session = connection.CreateSession())
                    {
                        IDestination destination = SessionUtil.GetDestination(session, aQueueName);
                        var replyDestination = session.CreateTemporaryQueue();

                        // Create a consumer and producer
                        using (IMessageProducer producer = session.CreateProducer(destination))
                        {
                            // Start the connection so that messages will be processed.
                            connection.Start();

                            IBytesMessage request = session.CreateBytesMessage(aMsg);
                            request.NMSReplyTo = replyDestination;

                            IDestination tempDestination = this.destinationResolver.ResolveDestinationName(session, request.NMSReplyTo.ToString());
                            IMessageConsumer consumer = session.CreateConsumer(tempDestination);
                            consumer.Listener += new MessageListener(this.OnAckRecieved);

                            // Send a message
                            producer.Send(request);
                            ack = this.autoEvent.WaitOne(this.msgConsumeTimeOut, true);

                            consumer.Close();
                            consumer.Dispose();
                            ConnectionFactoryUtils.GetTargetSession(session).DeleteDestination(tempDestination);
                        }
                        connection.Close();
                        session.Close();

使用上面的代码(使用 NmsDestinationAccessor)它正在工作。但它创建了一个持久队列。所以当我直接使用临时队列回复目的地时,它不起作用。

【问题讨论】:

  • “未创建”到底是什么意思,CreateConsumer() 是抛出异常还是只返回 null?
  • 完全没有错误。当我在 webconsole 上看到时,甚至没有为第二个代码创建临时队列。对于第三个代码,只有消费者没有被创建。
  • 从 NMS 项目中添加了一个示例 NUnit 测试以显示它的实际效果。

标签: c# activemq apache-nms


【解决方案1】:
  1. 不要使用 C#,而是用 java 编写代码,因为它是 ActiveMQ 的最佳套件。 阅读here for examples using temp queue in java.
  2. 然后将其编译为 JAR 文件,您可以通过 IKVM.NET 将其导入您的 c# 代码,如 here 所述
  3. 希望它能与此一起使用。

注意:你必须知道你不能在不同的会话中使用临时队列。

【讨论】:

    【解决方案2】:

    直接从NMSReplyTo.ToString 方法创建ActiveMQTempQueue 对象可能会导致您出现问题,因为ToString 方法不能保证返回可以创建匹配目标的值。由于您不知道发件人是否指定了临时目的地或普通目的地,因此它的编码也很糟糕。正确的做法是简单地使用会话的创建消费者方法使用NSMReplyTo 目标创建一个新的消费者。

    这是一个来自 NMS 项目的简单请求响应测试用例,可与 Apache.NMS.Stomp 和 Apache.NMS.ActiveMQ 配合使用。

    namespace Apache.NMS.Test
    {
    [TestFixture]
    public class RequestResponseTest : NMSTestSupport
    {
        protected static string DESTINATION_NAME = "RequestDestination";
    
        [Test]
        [Category("RequestResponse")]       
        public void TestRequestResponseMessaging()
        {
            using(IConnection connection = CreateConnection())
            {
                connection.Start();
                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    IDestination destination = SessionUtil.GetDestination(session, DESTINATION_NAME);
                    ITemporaryQueue replyTo = session.CreateTemporaryQueue();
    
                    using(IMessageConsumer consumer = session.CreateConsumer(destination))
                    using(IMessageProducer producer = session.CreateProducer(destination))
                    {
                        IMessage request = session.CreateMessage();
    
                        request.NMSReplyTo = replyTo;
    
                        producer.Send(request);
    
                        request = consumer.Receive(TimeSpan.FromMilliseconds(3000));
                        Assert.IsNotNull(request);
                        Assert.IsNotNull(request.NMSReplyTo);
    
                        using(IMessageProducer responder = session.CreateProducer(request.NMSReplyTo))
                        {
                            IMessage response = session.CreateTextMessage("RESPONSE");                          
                            responder.Send(response);
                        }                       
                    }
    
                    using(IMessageConsumer consumer = session.CreateConsumer(replyTo))
                    {
                        ITextMessage response = consumer.Receive(TimeSpan.FromMilliseconds(3000)) as ITextMessage;
                        Assert.IsNotNull(response);
                        Assert.AreEqual("RESPONSE", response.Text);
                    }
                }
            }
        }
    }
    

    【讨论】:

    • 无法确定如果没有完整的代码,第三个代码将创建一个消费者,然后在 using 块之后立即处置消费者,这将从代理处取消订阅。
    • 与我的不同之处:1)connection.Start() 放在创建消费者之前。2)connection.createSession 放在 connection.Start() 之前。会是这个原因吗?
    • connection.Start 唯一要做的就是触发向消费者发送消息。如果事情对您不起作用,那么最好的办法是创建一个 NUnit 测试用例并将其附加到 apache 的新 Jira 问题。
    【解决方案3】:

    临时队列仅在创建它的连接存在时才存在。在您的示例代码中,您是在开始连接之前创建它,所以我认为它只是默默地出错,因为没有活动连接。

    【讨论】:

    • 在调用start之前创建它们是合法的,这就是start方法的意图。您通常会在调用 start 之前创建所有目的地、生产者和消费者。
    • 好的,我没有使用过临时队列,他说临时队列没有被创建,所以我猜这可能是问题所在。在您的测试用例中,它们是在连接存在后创建的,因此认为这可能是关键区别。
    猜你喜欢
    • 2018-12-22
    • 1970-01-01
    • 1970-01-01
    • 2016-11-14
    • 2014-06-27
    • 1970-01-01
    • 2023-04-05
    • 2012-08-11
    • 2017-07-02
    相关资源
    最近更新 更多