【问题标题】:Message Retry policies in Spring AMQPSpring AMQP 中的消息重试策略
【发布时间】:2018-01-02 05:50:28
【问题描述】:

我正在使用 spring-amqp 在我的 Web 应用程序中使用来自 RabbitMQ 的消息。 Web 应用程序由多个组件组成,例如 (Redis, OracleDB)

现在我有一个场景,如果由于基础设施(如 Oracle 服务器已关闭、Redis 连接问题)而发生任何异常,我想将消息推送回同一队列并在 特定指定之后延迟我想消费回消息。

经过一定的延迟后,消息也会导致相同的异常,可能我想使用最大尝试次数选项或执行与上述相同的操作,将消息推送回队列并向管理员发送邮件说明“基础设施问题”

Spring AMQP 是否支持上述场景。?如果是,请告诉我如何提出此类或类似的解决方案。

我尝试了下面的代码。消息不用于死信队列,而是重新排队到同一个队列,导致无限循环。请纠正我哪里出错了

配置类

    @Configuration
public class MQConfig {

    public static final String OUTGOING_QUEUE = "my.outgoing.example";

    public static final String INCOMING_QUEUE = "my.incoming.example";

    public static final String DEAD_LETTER_QUEUE = "my.deadletter.queue.example";

    @Autowired
    private ConnectionFactory cachingConnectionFactory;

    // Setting the annotation listeners to use the jackson2JsonMessageConverter
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageConverter(jackson2JsonMessageConverter());
        factory.setDefaultRequeueRejected(false);
        return factory;
    }

    // Standardize on a single objectMapper for all message queue items
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Queue outgoingQueue() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-dead-letter-exchange", "dlx");
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE);
        args.put("x-message-ttl", 50000);
        return new Queue(OUTGOING_QUEUE, false, false, false, args);
    }

    @Bean
    public RabbitTemplate outgoingSender() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.setQueue(outgoingQueue().getName());
        // rabbitTemplate.setRoutingKey(outgoingQueue().getName());
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue incomingQueue() {
        return new Queue(INCOMING_QUEUE);
    }

    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    @Bean
    public DirectExchange dlx() {
        return new DirectExchange(DEAD_LETTER_QUEUE);
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(dlx()).with(DEAD_LETTER_QUEUE);
    }

}

核心逻辑

@Component
public class DeadLetterSendReceive {

  private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterSendReceive.class);

  @Autowired
  private RabbitTemplate outgoingSender;

  // Scheduled task to send an object every 5 seconds
  @Scheduled(fixedDelay = 5000)
  public void sender() {

    Integer int1[] = new Integer[]{10,20,30,40,50};  
    for (int i = 0; i <= int1.length; i++){
        System.out.println(int1[i]);

        if(int1[i]/10 == 1){
            throw new AmqpRejectAndDontRequeueException("to deadletter queue");
        }
        else{
            ExampleObject ex = new ExampleObject();
            ex.setValue(int1[i]);
             LOGGER.info("Sending example object at " + ex.getValue());
             outgoingSender.convertAndSend(ex);
        }

    }
  }

  // Annotation to listen for an ExampleObject
  @RabbitListener(queues = MQConfig.INCOMING_QUEUE)
  public void handleMessage(ExampleObject exampleObject) {
    LOGGER.info("Received incoming object at " + exampleObject.getValue());
  }

}

Pojo 类

import java.util.Date;

public class ExampleObject {

  private Date date = new Date();
  private int value;

  public int getValue() {
    return value;
}

public void setValue(int value) {
    this.value = value;
}

public ExampleObject() {
  }

  @Override
  public String toString() {
    return "ExampleObject{" +
        "date= " + date +
        '}';
  }

  public Date getDate() {
    return date;
  }

  public void setDate(Date date) {
    this.date = date;
  }

}

【问题讨论】:

    标签: spring-mvc spring-amqp spring-rabbit


    【解决方案1】:

    有几种方法可以做到这一点;使用delayed message exchange plugin 并将失败的消息发布给它。您可以设置一个标头来跟踪已进行了多少次尝试。

    或者您可以使用带有 TTL 的死信队列来执行此操作,其中死信队列配置了死信以将过期消息发送回原始队列。请参阅my answer to this question 及其指向另一个答案的链接。

    您可以使用x-death 标头来跟踪重试次数;最近的代理已将其更改为现在保留计数,而不是不断向标头添加新条目。

    要强制消息转到 DLQ,请将 defaultRequeueRejected 设置为 false 或抛出 AmqpRejectAndDontRequeueException

    【讨论】:

    • 我已经添加了我的代码。请纠正我哪里出错了。
    • 不清楚您的意思-您没有在侦听器中引发异常,因此消息将被正常使用。另外,TTL 在 DLQ 上进行,DLQ 需要死信配置才能在 TTL 过期后路由回主队列。
    猜你喜欢
    • 2020-12-07
    • 1970-01-01
    • 2014-06-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-08-15
    • 2014-03-24
    相关资源
    最近更新 更多