【问题标题】:RabbitMQ Queue with no subscribers没有订阅者的 RabbitMQ 队列
【发布时间】:2011-12-18 14:46:48
【问题描述】:

“持久”和“持久模式”似乎与重新启动有关,而不是与没有订阅者接收消息有关。

我希望 RabbitMQ 在没有订阅者时将消息保留在队列中。当订阅者确实上线时,该订阅者应该会收到消息。 RabbitMQ 可以做到这一点吗?

代码示例:

服务器:

namespace RabbitEg
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    //channel.ExchangeDelete(EXCHANGE_NAME);
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);
                    //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn);

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i);
                        IBasicProperties channelProps = channel.CreateBasicProperties();
                        channelProps.SetPersistent(true);

                        channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad);

                        Console.WriteLine("Sent Message " + i);
                        System.Threading.Thread.Sleep(25);
                    }

                    Console.ReadLine();
                }
            }
        }
    }
}

客户:

namespace RabbitListener
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);

                    string queueName = channel.QueueDeclare("myQueue", true, false, false, null);
                    channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}

【问题讨论】:

    标签: c# rabbitmq


    【解决方案1】:

    请参阅AMQP Reference,了解durablepersistent 的含义。

    基本上,队列要么是durable,要么是non-durable。前者存活代理重启,后者没有。

    消息以transientpersistent 发布。这个想法是durable 队列上的persistent 消息也应该在代理重新启动后仍然存在。

    因此,要获得您想要的,您需要 1) 将队列声明为 durable 和 2) 将消息发布为 persistent。此外,您可能还希望在频道上启用发布者确认;这样,您就会知道代理何时承担了消息的责任。

    【讨论】:

    • 谢谢,我试过了,但如果没有客户端监听,它仍然不会保留消息。附加到问题的代码示例。
    • 好代码。两个问题:1)服务器应该声明队列;声明两次不是问题,这是一种很好的做法,并且 2) queueDeclare() 为您提供了一个匿名的非持久队列;你想要 queueDeclare("myQueue", true, false, false, null)。
    • 另外,您编辑问题的方式让人难以理解您想要达到的目标。
    • 关于编辑的观点(相应更新)。我已经尝试按照您的指示指定队列(更新代码示例以证明这一点)。不过对我来说还是一样的结果。
    • 啊,明白了...显然在客户端上声明队列为时已晚,因为消息已经在队列中。将相同的 QueueDeclare 和 QueueBind 代码添加到服务器代码使其工作。感谢您的洞察力(非常感谢)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-06-19
    • 2016-12-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多