【问题标题】:Rabbit MQ - Recovery of connection/channel/consumerRabbit MQ - 连接/通道/消费者的恢复
【发布时间】:2015-11-13 18:06:45
【问题描述】:

我正在创建一个在无限循环中运行以从队列中读取消息的消费者。我正在寻找有关如何在我的无限循环中恢复 abd continue 的建议/示例代码,即使存在网络中断也是如此。消费者必须保持运行,因为它将作为 WindowsService 安装。

1) 谁能解释一下如何正确使用这些设置?它们有什么区别?

NetworkRecoveryInterval 
AutomaticRecoveryEnabled
RequestedHeartbeat

2) 请查看我当前为消费者提供的示例代码。我正在使用 .Net RabbitMQ 客户端 v3.5.6。

以上设置将如何为我“恢复”? 例如consumer.Queue.Dequeue 会阻塞直到它被恢复? 这似乎不对 所以...

我必须手动为此编码吗?例如consumer.Queue.Dequeue 会抛出一个我必须检测并手动重新创建我的连接、通道和消费者的异常吗?还是只是消费者,因为“AutomaticRecovery”会为我恢复频道?

这是否意味着我应该将消费者创建移到 while 循环中?频道创建怎么样?以及连接创建?

3) 假设我必须手动执行一些恢复代码,是否有事件回调(以及如何注册它们)告诉我存在网络问题?

谢谢!

public void StartConsumer(string queue)
{
            using (IModel channel = this.Connection.CreateModel())
            {
                var consumer = new QueueingBasicConsumer(channel);
                const bool noAck = false;
                channel.BasicConsume(queue, noAck, consumer);

                // do I need these conditions? or should I just do while(true)???
                while (channel.IsOpen &&        
                       Connection.IsOpen &&     
                       consumer.IsRunning)
                {
                    try
                    {
                        BasicDeliverEventArgs item;
                        if (consumer.Queue.Dequeue(Timeout, out item))
                        {
                            string message = System.Text.Encoding.UTF8.GetString(item.Body);
                            DoSomethingMethod(message);
                            channel.BasicAck(item.DeliveryTag, false);
                        }
                    }
                    catch (EndOfStreamException ex)
                    {   
                        // this is likely due to some connection issue -- what am I to do?
                    }
                    catch (Exception ex)
                    {   
                        // should never happen, but lets say my DoSomethingMethod(message); throws an exception
                        // presumably, I'll just log the error and keep on going
                    }
                }
            }
}

        public IConnection Connection
        {
            get
            {
                if (_connection == null) // _connection defined in class -- private static IConnection _connection;
                {
                     _connection = CreateConnection();
                }
                return _connection;
            }
        }

        private IConnection CreateConnection()
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "RabbitMqHostName",
                UserName = "RabbitMqUserName",
                Password = "RabbitMqPassword",
            };

            // why do we need to set this explicitly? shouldn't this be the default?
            factory.AutomaticRecoveryEnabled = true;

            // what is a good value to use?
            factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(5); 

            // what is a good value to use? How is this different from NetworkRecoveryInterval?
            factory.RequestedHeartbeat = 5; 

            IConnection connection = factory.CreateConnection();
            return connection;
        }

【问题讨论】:

    标签: c# rabbitmq


    【解决方案1】:

    RabbitMQ 功能

    documentation on RabbitMQ's site 实际上非常好。如果您想恢复队列、交换器和消费者,您正在寻找默认启用的拓扑恢复。自动恢复(enabled by default)包括:

    • 重新连接
    • 恢复连接监听器
    • 重新打开频道
    • 恢复频道监听器
    • 恢复频道basic.qos设置、发布者确认和交易设置

    NetworkRecoveryInterval 是执行自动恢复重试之前的时间量(默认为 5 秒)。

    Heartbeat 还有另一个目的,即识别死 TCP 连接。在 RabbitMQ 的网站上有more to read about that

    代码示例

    为恢复编写可靠的代码很棘手。 EndOfStreamException (正如您所怀疑的)最有可能是由于网络问题。如果您使用management plugin,您可以通过从那里关闭连接并查看是否触发了异常来重现此情况。对于类似生产的应用程序,您可能希望拥有一组在连接失败的情况下交替使用的代理。如果您有多个 RabbitMQ 代理,您可能还想保护自己免受一台或多台服务器上的长期服务器故障。您可能希望实施错误策略,例如重新排队消息或使用死信交换。

    我一直在考虑这些事情并编写了一个瘦客户端RawRabbit,它可以处理其中的一些事情。也许它可能适合你?如果没有,我建议您将QueueingBasicConsumer 更改为EventingBasicConsumer。它是事件驱动的,而不是线程阻塞的。

    var eventConsumer = new EventingBasicConsumer(channel);
    eventConsumer.Received += (sender, args) =>
    {
        var body = args.Body;
        eventConsumer.Model.BasicAck(args.DeliveryTag, false);
    };
    channel.BasicConsume(queue, false, eventConsumer);
    

    如果您激活了拓扑恢复,消费者将被 RabbitMQ 客户端恢复并重新开始接收消息。 要进行更精细的控制,请连接ConsumerCancelledShutdown 的事件处理程序以检测连接问题,并连接Registered 以了解何时可以再次使用消费者。

    【讨论】:

    • 感谢您的详细回复。我认为你是对的,QueuingBasicConsumer 不是要走的路,会看看 EventingBasicConsumer。
    猜你喜欢
    • 2016-10-25
    • 1970-01-01
    • 1970-01-01
    • 2017-12-29
    • 1970-01-01
    • 1970-01-01
    • 2015-02-14
    • 1970-01-01
    • 2023-03-06
    相关资源
    最近更新 更多