【问题标题】:Disable DLQ and re-delivery for ActivemMQ messages禁用 DLQ 并重新传递 ActivemMQ 消息
【发布时间】:2017-07-15 07:44:55
【问题描述】:

我正在开发系统,将要处理的消息推送到 ActiveMQ 中。我有严格的要求,消费者必须按传入顺序处理消息。如果消费者的消息处理失败,它需要回滚/恢复并继续无限重试。只有当一条消息处理成功时,消费者才需要提交并继续下一条消息。

如何防止回滚消息自动转发到 DLQ 以及针对此类需求配置重新传递策略的正确方法是什么?

【问题讨论】:

    标签: activemq


    【解决方案1】:

    当设置 RedeliveryPolicy 无限重试时,消息将永远不会发送到 DLQ。

    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

    使用ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE,您可以一一确认消息。

    http://activemq.apache.org/redelivery-policy.html

    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ActiveMQMessageConsumer;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.RedeliveryPolicy;
    
    public class SimpleConsumerIndividualAcknowledge {
    
        public static void main(String[] args) throws JMSException {
            Connection conn = null;
            try {
                ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
                RedeliveryPolicy policy = new RedeliveryPolicy();
                policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
                cf.setRedeliveryPolicy(policy);
                conn = cf.createConnection();
                ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                        ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
                ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
                        .createConsumer(session.createQueue("test"));
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            //do your stuff
                            message.acknowledge();
                        } catch (Exception e) {
                            throw new RuntimeException(e);//ActiveMQMessageConsumer.rollback() is called automatically
                        }
                    }
                });
                conn.start();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (Exception e) {
                    }
                }
            }
        }
    }
    

    如果您想手动停止和重新启动消费者,请查看此处activemq-redelivery-does-not-work

    【讨论】:

      猜你喜欢
      • 2020-03-29
      • 1970-01-01
      • 2012-10-02
      • 1970-01-01
      • 2017-07-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-03-20
      相关资源
      最近更新 更多