【发布时间】:2017-07-15 07:44:55
【问题描述】:
我正在开发系统,将要处理的消息推送到 ActiveMQ 中。我有严格的要求,消费者必须按传入顺序处理消息。如果消费者的消息处理失败,它需要回滚/恢复并继续无限重试。只有当一条消息处理成功时,消费者才需要提交并继续下一条消息。
如何防止回滚消息自动转发到 DLQ 以及针对此类需求配置重新传递策略的正确方法是什么?
【问题讨论】:
标签: activemq
我正在开发系统,将要处理的消息推送到 ActiveMQ 中。我有严格的要求,消费者必须按传入顺序处理消息。如果消费者的消息处理失败,它需要回滚/恢复并继续无限重试。只有当一条消息处理成功时,消费者才需要提交并继续下一条消息。
如何防止回滚消息自动转发到 DLQ 以及针对此类需求配置重新传递策略的正确方法是什么?
【问题讨论】:
标签: activemq
当设置 RedeliveryPolicy 无限重试时,消息将永远不会发送到 DLQ。
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
使用ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE,您可以一一确认消息。
http://activemq.apache.org/redelivery-policy.html
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
public class SimpleConsumerIndividualAcknowledge {
public static void main(String[] args) throws JMSException {
Connection conn = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
cf.setRedeliveryPolicy(policy);
conn = cf.createConnection();
ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session
.createConsumer(session.createQueue("test"));
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//do your stuff
message.acknowledge();
} catch (Exception e) {
throw new RuntimeException(e);//ActiveMQMessageConsumer.rollback() is called automatically
}
}
});
conn.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (Exception e) {
}
}
}
}
}
如果您想手动停止和重新启动消费者,请查看此处activemq-redelivery-does-not-work
【讨论】: