【问题标题】:RabbitMQ C# driver stops receiving messagesRabbitMQ C# 驱动程序停止接收消息
【发布时间】:2012-09-19 16:42:49
【问题描述】:

您是否有任何指示如何确定订阅问题何时发生以便我可以重新连接?

我的服务使用 RabbitMQ.Client.MessagePatterns.Subscription 进行订阅。一段时间后,我的客户默默地停止接收消息。我怀疑网络问题,因为我的 VPN 连接不是最可靠的。

我已经通读了一段时间的文档,寻找一个密钥,以找出此订阅何时可能由于网络问题而被破坏,但运气不佳。我试过检查连接和通道是否仍然打开,但它似乎总是报告它仍然打开。

它处理的消息工作得很好,并且被确认回队列,所以我不认为这是“ack”的问题。

我确定我一定只是遗漏了一些简单的东西,但我还没有找到它。

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

更新:

我通过在虚拟机中运行服务器来模拟网络故障,当我断开连接时,我确实得到了一个异常足够长,所以也许这不是网络问题。现在我不知道它会是什么,但运行几个小时后它就失败了。

【问题讨论】:

    标签: c# rabbitmq


    【解决方案1】:

    编辑:由于我对此表示赞同,我应该指出 .NET RabbitMQ 客户端现在内置了此功能:https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

    理想情况下,您应该能够使用它并避免手动实现重新连接逻辑。


    我最近不得不实施几乎相同的事情。据我所知,关于 RabbitMQ 的大多数可用信息都假设您的网络非常可靠,或者您在与任何发送或接收消息的客户端相同的机器上运行 RabbitMQ 代理,从而允许 Rabbit 处理任何连接问题。

    将 Rabbit 客户端设置为对断开连接具有鲁棒性并不难,但您需要处理一些特殊情况。

    你需要做的第一件事是打开心跳:

    ConnectionFactory factory = new ConnectionFactory() 
    {
      Uri = brokerUri,
      RequestedHeartbeat = 30,
    }; 
    

    将“RequestedHeartbeat”设置为 30 将使客户端每 30 秒检查一次连接是否仍然有效。如果不启用此功能,消息订阅者将愉快地坐在那里等待另一条消息进来,而不会知道它的连接已经坏了。

    打开心跳也会使服务器检查连接是否仍然正常,这可能非常重要。如果在订阅者接收到消息之后但在确认之前连接变坏,则服务器只是假设客户端花费了很长时间,并且消息“卡在”死连接上,直到它被关闭。打开心跳后,服务器将识别连接何时出现故障并关闭它,将消息放回队列中,以便另一个订阅者可以处理它。如果没有心跳,我不得不手动进入并关闭 Rabbit 管理 UI 中的连接,以便将卡住的消息传递给订阅者。

    其次,您需要处理OperationInterruptedException。正如您所注意到的,这通常是 Rabbit 客户端在发现连接中断时会抛出的异常。如果在连接中断时调用IModel.QueueDeclare(),这是您将得到的异常。通过处理您的订阅、频道和连接并创建新的来处理此异常。

    最后,您必须处理消费者在尝试使用来自已关闭连接的消息时所做的事情。不幸的是,在 Rabbit 客户端中,从队列中消费消息的每种不同方式似乎反应不同。如果您在关闭的连接上调用 QueueingBasicConsumer.Queue.DequeueQueueingBasicConsumer 将抛出 EndOfStreamExceptionEventingBasicConsumer 什么都不做,因为它只是在等待消息。从我的尝试中可以看出,您正在使用的Subscription 类似乎从对Subscription.Next 的调用返回true,但args 的值为null。再一次,通过处理你的连接、频道和订阅并重新创建它们来处理这个问题。

    connection.IsOpen 的值将在连接失败且心跳打开时更新为 False,因此您可以根据需要进行检查。但是,由于心跳在单独的线程上运行,您仍然需要处理连接在检查时打开但在调用subscription.Next() 之前关闭的情况。

    最后要注意的是IConnection.Dispose()。如果在连接关闭后调用 dispose,此调用将引发 EndOfStreamException。这对我来说似乎是一个错误,我不喜欢在 IDisposable 对象上不调用 dispose,所以我调用它并吞下异常。

    将所有这些放在一个快速而肮脏的例子中:

    public bool Cancelled { get; set; }
    
    IConnection _connection = null;
    IModel _channel = null;
    Subscription _subscription = null;
    
    public void Run(string brokerUri, string queueName, Action<byte[]> handler)
    {
        ConnectionFactory factory = new ConnectionFactory() 
        {
            Uri = brokerUri,
            RequestedHeartbeat = 30,
        };
    
        while (!Cancelled)
        {               
            try
            {
                if(_subscription == null)
                {
                    try
                    {
                        _connection = factory.CreateConnection();
                    }
                    catch(BrokerUnreachableException)
                    {
                        //You probably want to log the error and cancel after N tries, 
                        //otherwise start the loop over to try to connect again after a second or so.
                        continue;
                    }
    
                    _channel = _connection.CreateModel();
                    _channel.QueueDeclare(queueName, true, false, false, null);
                    _subscription = new Subscription(_channel, queueName, false);
                }
    
                BasicDeliverEventArgs args;
                bool gotMessage = _subscription.Next(250, out args);
                if (gotMessage)
                {
                    if(args == null)
                    {
                        //This means the connection is closed.
                        DisposeAllConnectionObjects();
                        continue;
                    }
    
                    handler(args.Body);
                    _subscription.Ack(args);
                }
            }
            catch(OperationInterruptedException ex)
            {
                DisposeAllConnectionObjects();
            }
        }
        DisposeAllConnectionObjects();
    }
    
    private void DisposeAllConnectionObjects()
    {
        if(_subscription != null)
        {
            //IDisposable is implemented explicitly for some reason.
            ((IDisposable)_subscription).Dispose();
            _subscription = null;
        }
    
        if(_channel != null)
        {
            _channel.Dispose();
            _channel = null;
        }
    
        if(_connection != null)
        {
            try
            {
                _connection.Dispose();
            }
            catch(EndOfStreamException) 
            {
            }
            _connection = null;
        }
    }
    

    【讨论】:

    • 哇。这看起来很棒。今天早上我已经在我的服务中编码并部署了它。你为我节省了大量时间。
    猜你喜欢
    • 1970-01-01
    • 2019-10-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-15
    • 2016-05-24
    相关资源
    最近更新 更多