【问题标题】:Spring-boot-starter RabbitMQ global error handlingSpring-boot-starter RabbitMQ 全局错误处理
【发布时间】:2017-07-02 02:06:03
【问题描述】:

我正在使用 spring-boot-starter-amqp 1.4.2。生产者和消费者工作正常,但有时传入的 JSON 消息的语法不正确。这会导致以下(正确的)异常:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...

将来我可能会面临更多的例外。所以我想配置一个全局错误处理程序,这样如果任何一个消费者有任何异常,我都可以全局处理它。

注意:在这种情况下,消息根本没有到达消费者。我想在整个消费者中处理这类异常。

请找到以下代码:

RabbitConfiguration.java

@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}

消费者

@RabbitListener(
        id = "book_queue",
        bindings = @QueueBinding(
                value = @Queue(value = "book.queue", durable = "true"),
                exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
                key = "book.queue"
        )
    )
public void handle(Message message) {
//Business Logic
}

谁能帮助我在全球范围内处理错误处理程序。您的帮助应该是可观的。

根据 Gary 的评论更新问题

我可以运行你的例子并得到你所说的预期输出,我只是想根据你的例子尝试更多的负面案例,但我无法理解一些事情,

this.template.convertAndSend(queue().getName(), new Foo("bar"));

输出

收到:Foo [foo=bar]

上面的代码工作正常。现在我发送一些其他 bean 而不是“Foo”

this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));

输出

收到:Foo [foo=null]

消费者不应接受此消息,因为它是完全不同的 bean(Differ.class 不是 Foo.class),所以我希望它应该转到“ConditionalRejectingErrorHandler”。为什么它接受错误的有效负载并打印为 null ?如果我错了,请纠正我。

编辑 1:

Gary,正如你所说,我在发送消息时设置了标题“TypeId”,但消费者仍然可以转换错误消息并且它没有抛出任何错误......请找到代码下面,我使用了你的代码示例,只是做了以下修改,

1) 发送消息时添加“__TypeId__”,

this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    }); 

2) 在“Jackson2JsonMessageConverter”中添加了“DefaultClassMapper”

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}    

【问题讨论】:

    标签: spring-boot rabbitmq rabbitmq-exchange spring-rabbit


    【解决方案1】:

    覆盖 Boot 的侦听器容器工厂 bean,如 Enable Listener Endpoint Annotations 中所述。

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(myErrorHandler());
        ...
        return factory;
    }
    

    您可以注入 ErrorHandler 的自定义实现,它将添加到工厂创建的每个侦听器容器中。

    void handleError(Throwable t);
    

    throwable 将是 ListenerExecutionFailedException,从版本 1.6.7(启动 1.4.4)开始,其 failedMessage 属性中包含原始入站消息。

    默认错误处理程序认为MessageConversionException 等原因是致命的(它们不会被重新排队)。

    如果您希望保留该行为(对于此类问题是正常的),您应该在处理错误后抛出AmqpRejectAndDontRequeueException

    顺便说一句,你不需要那个RabbitTemplate bean;如果您的应用程序中只有一个MessageConverter bean,启动会自动将其连接到容器和模板中。

    由于您将覆盖引导的工厂,您必须在那里连接转换器。

    编辑

    您可以使用默认的ConditionalRejectingErrorHandler,但使用FatalExceptionStrategy 的自定义实现注入它。实际上,您可以继承它的DefaultExceptionStrategy 并覆盖isFatal(Throwable t),然后在处理错误后返回super.isFatal(t)

    EDIT2

    完整示例;发送 1 条好消息和 1 条坏消息:

    package com.example;
    
    import org.slf4j.Logger;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
    import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Bean;
    import org.springframework.util.ErrorHandler;
    
    @SpringBootApplication
    public class So42215050Application {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
            context.getBean(So42215050Application.class).runDemo();
            context.close();
        }
    
        @Autowired
        private RabbitTemplate template;
    
        private void runDemo() throws Exception {
            this.template.convertAndSend(queue().getName(), new Foo("bar"));
            this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
                return new Message("some bad json".getBytes(), m.getMessageProperties());
            });
            Thread.sleep(5000);
        }
    
        @RabbitListener(queues = "So42215050")
        public void handle(Foo in) {
            System.out.println("Received: " + in);
        }
    
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(jsonConverter());
            factory.setErrorHandler(errorHandler());
            return factory;
        }
    
        @Bean
        public ErrorHandler errorHandler() {
            return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
        }
    
        @Bean
        public Queue queue() {
            return new Queue("So42215050", false, false, true);
        }
    
        @Bean
        public MessageConverter jsonConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    
            private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
    
            @Override
            public boolean isFatal(Throwable t) {
                if (t instanceof ListenerExecutionFailedException) {
                    ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                    logger.error("Failed to process inbound message from queue "
                            + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                            + "; failed message: " + lefe.getFailedMessage(), t);
                }
                return super.isFatal(t);
            }
    
        }
    
        public static class Foo {
    
            private String foo;
    
            public Foo() {
                super();
            }
    
            public Foo(String foo) {
                this.foo = foo;
            }
    
            public String getFoo() {
                return this.foo;
            }
    
            public void setFoo(String foo) {
                this.foo = foo;
            }
    
            @Override
            public String toString() {
                return "Foo [foo=" + this.foo + "]";
            }
    
        }
    }
    

    结果:

    Received: Foo [foo=bar]
    

    2017-02-14 09:42:50.972 错误 44868 --- [cTaskExecutor-1] 5050Application$MyFatalExceptionStrategy:无法处理来自队列 So42215050 的入站消息;失败的消息:(正文:'some bad json' MessageProperties [headers={TypeId=com.example.So42215050Application$Foo},timestamp=null,messageId=null,userId=null,receivedUserId=null, appId=null,clusterId=null,type=null,correlationId=null,correlationIdString=null,replyTo=null,contentType=application/json,contentEncoding=UTF-8,contentLength=0,deliveryMode=null,receivedDeliveryMode=PERSISTENT,expiration= null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=So42215050, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue=So42215050])

    EDIT3

    JSON 不传达任何类型信息。默认情况下,要转换的类型将从方法参数类型中推断出来。如果您希望拒绝任何无法转换为该类型的内容,则需要适当地配置消息转换器。

    例如:

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        converter.setClassMapper(mapper);
        return converter;
    }
    

    现在,当我将示例更改为发送 Bar 而不是 Foo...

    public static class Bar {
    
       ...
    
    }
    

    this.template.convertAndSend(queue().getName(), new Bar("baz"));
    

    我明白了……

    Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
    ... 13 common frames omitted
    Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]
    

    但这只有在发件人设置__TypeId__ 标头时才有效(如果模板配置了相同的适配器,则模板会这样做)。

    EDIT4

    @SpringBootApplication
    public class So42215050Application {
    
        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
            context.getBean(So42215050Application.class).runDemo();
            context.close();
        }
    
        @Autowired
        private RabbitTemplate template;
    
        private void runDemo() throws Exception {
            this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
            this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
                return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
            });
            Message message = MessageBuilder
                    .withBody("{\"foo\":\"bar\"}".getBytes())
                    .andProperties(
                            MessagePropertiesBuilder
                                .newInstance()
                                .setContentType("application/json")
                                .build())
                    .build();
            this.template.send(queue().getName(), message); // Success - default Foo class when no header
            message.getMessageProperties().setHeader("__TypeId__", "foo");
            this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
            message.getMessageProperties().setHeader("__TypeId__", "bar");
            this.template.send(queue().getName(), message); // fail - mapped to a Map
            Thread.sleep(5000);
        }
    
        @RabbitListener(queues = "So42215050")
        public void handle(Foo in) {
            logger.info("Received: " + in);
        }
    
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(jsonConverter());
            factory.setErrorHandler(errorHandler());
            return factory;
        }
    
        @Bean
        public ErrorHandler errorHandler() {
            return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
        }
    
        @Bean
        public Queue queue() {
            return new Queue("So42215050", false, false, true);
        }
    
        @Bean
        public MessageConverter jsonConverter() {
            Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
            DefaultClassMapper mapper = new DefaultClassMapper();
            mapper.setDefaultType(Foo.class);
            Map<String, Class<?>> mappings = new HashMap<>();
            mappings.put("foo", Foo.class);
            mappings.put("bar", Object.class);
            mapper.setIdClassMapping(mappings);
            converter.setClassMapper(mapper);
            return converter;
        }
    
        public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
    
            private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
    
            @Override
            public boolean isFatal(Throwable t) {
                if (t instanceof ListenerExecutionFailedException) {
                    ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                    logger.error("Failed to process inbound message from queue "
                            + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                            + "; failed message: " + lefe.getFailedMessage(), t);
                }
                return super.isFatal(t);
            }
    
        }
    
        public static class Foo {
    
            private String foo;
    
            public Foo() {
                super();
            }
    
            public Foo(String foo) {
                this.foo = foo;
            }
    
            public String getFoo() {
                return this.foo;
            }
    
            public void setFoo(String foo) {
                this.foo = foo;
            }
    
            @Override
            public String toString() {
                return "Foo [foo=" + this.foo + "]";
            }
    
        }
    
        public static class Bar {
    
            private String foo;
    
            public Bar() {
                super();
            }
    
            public Bar(String foo) {
                this.foo = foo;
            }
    
            public String getFoo() {
                return this.foo;
            }
    
            public void setFoo(String foo) {
                this.foo = foo;
            }
    
            @Override
            public String toString() {
                return "Bar [foo=" + this.foo + "]";
            }
    
        }
    
    }
    

    【讨论】:

    • 非常感谢您的回复。需要更多澄清,我从您的回答中无法理解。1)要实现全局错误处理程序,我们应该有一个 bean“SimpleRabbitListenerContainerFactory”?(没有其他办法) 2)我可以将“ErrorHandler”视为一种方法。是否可以将bean定义为“ErrorHandler”?您能否分享代码示例以获取编写 ErrorHandler 的有效方法?至少是一个提示或链接。 3)您已经提到,拥有一个“MessageConverter”是多余的,并且启动会将其自动连接到容器。对我来说,spring-boot 不会自动这样做,我错过了什么吗?
    • 现在可以在spring-amqp-samples 中使用spring-rabbit-global-errorhandler
    • 非常感谢。让我试一试,一旦工作我会立即接受答案
    • 抱歉,回复很晚...您的示例运行良好,但如果我通过 "this.template.convertAndSend(queue().getName(), new Differ("snack","hihi ","你好吗"));", Rabbit 正在接受这个有效载荷,但它应该拒绝,因为我的消费者期望“Foo”,但我传递的是“Differ”,它是不同的类和不同的属性,我可以知道为什么它不是来到“ConditionalRejectingErrorHandler”?
    • 不要将代码放入 cmets,它不可读 - 请改为编辑您的问题。不清楚你在问什么;您不会在 template.send 上收到错误,因为发送方不知道消费者想要什么。如果您的意思是您也没有在消费者方面看到它,请附上 DEBUG 日志以进行交付。
    猜你喜欢
    • 2019-11-08
    • 2020-09-16
    • 2014-12-29
    • 1970-01-01
    • 2014-04-07
    • 2018-11-19
    • 2017-06-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多