【问题标题】:RabbitMQ Consumer recoveryRabbitMQ 消费者恢复
【发布时间】:2014-11-19 22:59:48
【问题描述】:

我最近开始研究 RabbitMQ。我使用作为我的消费者的 RabbitMQ .net 库创建了一个 Windows 服务。此消费者将用于处理批量处理,例如发送大量电子邮件等。

我通过实现作为 RabbitMQ .net 库一部分的 SimpleRpcServer 类并覆盖 HandleCall/HandleCast 方法来构建它。在消费和处理消息方面,一切都很好。我们已开始研究可用于将此 Windows 服务部署到 Amazon Web Services 的服务器的部署选项。将更新部署到 Windows 服务时,必须停止、更新该服务,然后重新启动。

我的问题是:我该怎么做才能在 Windows 服务上触发 Stop 事件时,服务要么等待所有当前传递给消费者的消息完成处理并重新排队任何已发送但尚未开始处理的消息。

这里是一些示例代码:

public partial class ExampleService: ServiceBase
{    
    private List<Task> _watcherTasks = new List<Task>();
    protected override void OnStart(string[] args)
    {
        var factory = new ConnectionFactory() { 
                HostName = _hostname,
                VirtualHost = _virtualHost,
                UserName = _username,
                Password = _password,
                Ssl = new SslOption
                {
                    Enabled = true,
                    ServerName = _hostname,
                    AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
                                            SslPolicyErrors.RemoteCertificateChainErrors
                },
                RequestedHeartbeat = 30
            };
            conn = factory.CreateConnection();

            var emailQueue = requestChannel.QueueDeclare("email", false, false, false, null);
            var emailSub = new Subscription(requestChannel, emailQueue);
            var emailServer = new ServiceBusConsumer(emailSub);
            Task emailWatcher = Task.Run(() => emailServer.MainLoop());            
            _watcherTasks.Add(emailWatcher);
     }

     protected override void OnStop()
     {
          conn.Close();
          Task.WaitAll(_watcherTasks.ToArray(), 60000);    
     }
}

public class ServiceBusConsumer : SimpleRpcServer
{
    public ServiceBusConsumer(Subscription subscription) : base(subscription)
    {

    }
    public override void HandleSimpleCast(bool isRedelivered, RabbitMQ.Client.IBasicProperties requestProperties, byte[] body)
    {
        try
        {
             //Uses some reflection and invokes function to process the message here.
        }
        catch (Exception ex)
        {
             //Creates event log entry of exception
        }
    }
}

【问题讨论】:

    标签: c# windows-services rabbitmq


    【解决方案1】:

    在消息被消费后,您无法将消息显式推送回队列。如果没有发送确认,并且您的队列配置为接收确认,则终止已使用消息的进程应该会导致消息在超时后重新排队。但请考虑,这可能会中断队列中消息的排序。

    例如,假设您发送消息 M1 和 M2,其中 M2 依赖于 M1。如果您在 M1 正在处理时终止您的消费者,而当 M2 仍在队列中时,M1 将在 M2 后面重新排队。当您的消费者重新启动时,这些消息将被乱序处理。

    试图实现这一点会引入过多的复杂性。我只是让您的 Windows 服务完成处理其出队的消息,然后停止侦听队列,并优雅地终止。您的代码应该如下所示:

    while (!stopConsuming) {
        try {
            BasicDeliverEventArgs basicDeliverEventArgs;
            var messageIsAvailable = consumer.Queue.Dequeue(timeout, out basicDeliverEventArgs);
    
            if (!messageIsAvailable) continue;
                var payload = basicDeliverEventArgs.Body;
    
                var message = Encoding.UTF8.GetString(payload);
                OnMessageReceived(new MessageReceivedEventArgs {
                    Message = message,
                    EventArgs = basicDeliverEventArgs
                });
    
                if (implicitAck && !noAck) channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
            }
    
            catch (Exception exception) {
                OnMessageReceived(new MessageReceivedEventArgs {
                    Exception = new AMQPConsumerProcessingException(exception)
                });
                if (!catchAllExceptions) Stop();
            }
        }
    }
    

    在此示例中,stopConsuming 变量(如果从另一个线程访问,则标记为volatile)将确定 Windows 服务在处理完当前消息后是否继续从队列中读取消息。如果设置为true,循环将结束,不再有消息出队。

    我即将部署一个 C# 库,它可以完全满足您的要求,等等。请查看我的博客insidethecpu.com 了解详细信息和教程。您可能还对this 感兴趣。

    【讨论】:

    • 消息也可以被拒绝和重新排队,而无需停止消费者。
    • 他们可以,但是如果有一个发布者持续将消息推送到队列中,那么消费者将一直忙于拒绝消息,从而延迟部署过程。这里的重点是及时优雅地停止服务,以便可以在 AWS 上应用更新。
    • 如果你想停止从队列接收消息,那么有 basic.cancel 命令。
    • 当然。这将要求您的消费者进入错误状态,这不是尽可能优雅的退出。这里有很多建议来实现所要求的。
    • 这与我最终所做的非常相似。我选择覆盖 ProcessRequest 函数来检查服务是否正在关闭。如果是这样,它将停止消费消息并优雅地关闭连接。工作得很好。感谢大家的帮助!