【问题标题】:Re-queue message on exception异常时重新排队消息
【发布时间】:2015-11-11 15:55:32
【问题描述】:

我正在寻找一种可靠的方法来重新排队无法正确处理的消息 - 目前。

我一直在查看http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/,似乎支持在 RabbitMQ API 中重新排队消息。

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

但是我使用的是 EasyNetQ。 所以想知道我会如何在这里做类似的事情。

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

这甚至是一个好方法吗?不是ACK,如果do work 超过RabbitMQ 服务器等待ACK 的超时时间,该消息可能会导致问题。

【问题讨论】:

    标签: c# rabbitmq publish-subscribe easynetq


    【解决方案1】:

    所以我想出了这个解决方案。它用 EasyNetQ 替换了默认的错误策略。

    public class DeadLetterStrategy : DefaultConsumerErrorStrategy
    {
        public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
        : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
        {
        }
    
        public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
        {
            object deathHeaderObject;
            if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
                return AckStrategies.NackWithoutRequeue;
    
            var deathHeaders = deathHeaderObject as IList;
    
            if (deathHeaders == null)
                return AckStrategies.NackWithoutRequeue;
    
            var retries = 0;
            foreach (IDictionary header in deathHeaders)
            {
                var count = int.Parse(header["count"].ToString());
                retries += count;
            }
    
            if (retries < 3)
                return AckStrategies.NackWithoutRequeue;
            return base.HandleConsumerError(context, exception);
        }
    }
    

    你可以这样替换它:

    RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())
    

    您必须使用AdvancedBus,因此您必须手动设置所有内容。

    using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
    {
        var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
        var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
        var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
        bus.Advanced.Bind(deadExchange, queue, "");
        bus.Advanced.Bind(textExchange, queue, "");
    
        bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
    }
    

    这将使失败的消息死信 3 次。之后它将进入 EasyNetQ 提供的默认错误队列进行错误处理。 您可以订阅该队列。

    当异常从您的使用者方法中传播出来时,消息是死信的。所以这会触发一个死信。

    static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
    {
        throw new Exception("This is a test!");
    }
    

    【讨论】:

    • 查看您的 DeadLetterStrategy 的实现,在重新提交消息到队列之间没有延迟。我希望在下次重新提交之间有一些运行延迟,例如 2*t 秒延迟(通过乘以 2 等因素来延迟时间的常见策略)。这可以在没有 Thread.Sleep 的情况下完成吗?
    【解决方案2】:

    据我所知,无法通过 EasyNetQ 手动 acknackreject 发送消息。

    我看到你有opened an issue ticket with the EasyNetQ team,关于这个……但还没有答案。

    FWIW,这是一件非常合适的事情。我使用的所有库都支持这个特性集(在 NodeJS 中),它很常见。我很惊讶 EasyNetQ 不支持这个。

    【讨论】:

    • 如果我想要这个功能,似乎我必须使用 RabbitMQ.Client
    猜你喜欢
    • 2015-09-23
    • 2022-09-29
    • 1970-01-01
    • 2013-10-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-01
    • 2012-08-27
    相关资源
    最近更新 更多