【问题标题】:How to handle badly formatted messages on the consumer side when working with Spring Integration and RabbitMQ使用 Spring Integration 和 RabbitMQ 时如何在消费者端处理格式错误的消息
【发布时间】:2015-05-27 11:44:15
【问题描述】:

我目前正在处理一个项目,该项目涉及使用来自 RabbitMQ 代理的消息。但是,我对 Spring Integration、AMQP 和 RabbitMQ 还是很陌生。 我在使用格式错误的消息格式时遇到问题。当我的消费者收到格式错误的消息时,它会将其返回队列,然后 RabbitMQ 将其​​发回,从而创建一个无限循环。 在 Spring Integration 文档中,有一些配置可以实现这种消息不会返回到队列中。

但是我不明白如何实现它。 我想要的是能够配置某种格式像

class ExceptionHandler {

   public void handle(Throwable e ) { 

     Logger.log("Some log ... we don't give a Sh** ... ") ; 

   } 

}

我已经检查了3.9 Exception Handling 部分 和3.15.3 Message Listeners and the Asynchronous Case 但不幸的是我什么都听不懂。

因此,如果您有示例代码或指向其中的链接,我将不胜感激。

【问题讨论】:

    标签: java spring rabbitmq spring-integration


    【解决方案1】:

    是的,这是正确的解决方案之一 - 当您决定消息不应该是 requeued 时,抛出 AmqpRejectAndDontRequeueException

    SimpleMessageListenerContainer上还有defaultRequeueRejected,默认为true

    您也许应该看看DLX/DLQ 解决方案,以免丢失那些格式错误的消息。

    请分享困扰您的 StackTrace。

    SimpleMessageListenerContainer中有这样的代码:

    catch (AmqpRejectAndDontRequeueException rejectEx) {
                        /*
                         *  These will normally be wrapped by an LEFE if thrown by the
                         *  listener, but we will also honor it if thrown by an
                         *  error handler.
                         */
                    }
    

    【讨论】:

    • 你能提供如何使用spring ...吗?
    • 抱歉,没听懂。 “做什么?你的MessagesErrorHandler 已经用AmqpRejectAndDontRequeueException 做了这些事情。你还需要什么?
    【解决方案2】:

    在多次尝试失败后,我能够处理该错误。但是我现在正在努力隐藏异常日志。我不明白为什么要以这种方式实现。我也能够处理日志问题。 事实证明,还有另一种方式表示您不想返回带有acknowledge-mode="NONE" 属性的消息。结帐10.2 Inbound Channel Adapter 部分。这样你甚至不需要抛出那个丑陋的异常。

    < bean id="handler" class="MessagesErrorHandler"/>
        < int-amqp:inbound-channel-adapter
                error-handler="handler"
                id="idActivityAdapter"
                channel="channelName"
                queue-names="activityQueue"
                />
    
    
    
    
    import org.springframework.util.ErrorHandler;
    import org.springframework.amqp.AmqpRejectAndDontRequeueException;
    
    public class MessagesErrorHandler implements ErrorHandler {
    
    
        @Override
        public void handleError(Throwable throwable) {
            System.out.println("YESSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS ERROR IS HANDLED !!!!");
            throw new AmqpRejectAndDontRequeueException(throwable);// this very important
            //so that message don't go back to the queue. 
        }
    }
    

    【讨论】:

      【解决方案3】:

      AmqpRejectAndDontRequeueException 是向容器发出拒绝消息而不重新排队消息的信号;默认情况下,它会重新排队任何异常。

      或者,您可以手动连接SimpleMessageListenerContainer bean;将defaultRequeueRejected 设置为false 并使用container 属性将其添加到适配器。那么,所有的异常都会导致消息被拒绝并且不会被重新排队。

      此外,您可以使用错误通道并从错误流中抛出 AmqpRejectAndDontRequeueException,而不是 error-handler

      【讨论】:

        猜你喜欢
        • 2023-03-29
        • 2016-05-04
        • 2013-06-09
        • 1970-01-01
        • 1970-01-01
        • 2015-10-03
        • 2017-07-31
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多