【问题标题】:Negative Acknowledgement to RabbitMQ Queue to Re-Queue the Message using Spring AMQP对 RabbitMQ 队列的否定确认以使用 Spring AMQP 重新排队消息
【发布时间】:2017-01-23 09:35:07
【问题描述】:

我有一个应用程序,它使用 spring AMQP 来向其他应用程序消费和生成消息。 我有一个场景发生了一些异常,我需要重新排队回到 RabbitMQ。对于一些我需要忽略的异常(基本上我需要忽略不需要重新排队的消息)

目前在下面的代码中, 我已将配置设置为

factory.setDefaultRequeueRejected(false);

但我的要求是动态拒绝某些消息并重新排队返回 RabbitMQ 以获取某些消息。

请推荐

@Bean(name="rabbitListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
        idClassMapping.put(Constants.JOB_TYPE_ID_, JobListenerDTO.class);
        classMapper.setIdClassMapping(idClassMapping);
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);
        factory.setDefaultRequeueRejected(false);
        factory.setReceiveTimeout(10L);
        return factory;
    }

【问题讨论】:

    标签: spring-amqp spring-rabbit


    【解决方案1】:

    你不能那样做(默认为 false)。

    要选择性地执行此操作,您必须将 defaultRequeueRejected 设置为 true 并将 AmqpRejectAndDontRequeueRejected 扔给您想要丢弃的任何内容。

    您可以将所需的逻辑封装在ErrorHandler 中。

    默认错误处理程序完全针对特定的异常列表执行此操作,例如 documented here - 您可以注入自定义 FatalExceptionStrategy

    但对于有条件拒绝,defaultRequeueRejected 必须是 true

    编辑

    factory.setErrorHandler(new ConditionalRejectingErrorHandler(t -> {
            Throwable cause = t.getCause();
            return cause instanceof MessageConversionException
                    || cause instanceof org.springframework.messaging.converter.MessageConversionException
                    || cause instanceof MethodArgumentNotValidException
                    || cause instanceof MethodArgumentTypeMismatchException
                    || cause instanceof NoSuchMethodException
                    || cause instanceof ClassCastException
                    || cause instanceof MyBadXMLException;
        }));
    

    这会将MyBadXMLException 添加到标准列表中。

    如果您使用的不是 Java 8,请使用 new FatalExceptionStrategy() {...}

    【讨论】:

    • 感谢您的回复。能否请您提供一个示例来展示 ErrorHandler 的使用?
    • 还有一个查询,我从生产者那里得到了一个 xml 输入。如果 xml 数据错误,我会收到 XML 异常。只有这个例外我不需要重新排队(如何做到这一点?)。休息我需要重新排队的所有例外。正如你所提到的,我会将defaultRequeueRejected 设置为true
    • 我从 rabbitmq 链接中读到,The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Spring AMQP 不支持吗?
    • 容器会为你做这件事。 Spring AMQP 是对 ampq 的抽象。如果监听器正常退出,容器发送basicAck。如果监听器抛出异常,它会发送basicReject。是否重新排队取决于defaultRequeueRejected,但可以通过抛出AmqpRejectAndDontRequeueException 来覆盖(如果为真)。
    • 如果你真的想对监听器中的确认负责,请将确认模式设置为MANUAL,并查看this answer 了解如何操作。但大多数人让容器来做。
    最近更新 更多