【发布时间】:2014-08-13 17:40:17
【问题描述】:
我以前从未使用过activeMQ。我正在为我的项目做一些研究,我需要根据听众的逻辑重新传递消息。根据activemq's documentation,我是:
- 使用 Session.CLIENT_ACKNOWLEDGE 模式
- 根据逻辑,确认消息或执行 session.recover 以再次获取消息。
使用我当前的代码,我可以一次从 testQueue 中读取消息。 但是我的重新交付政策不起作用。我尝试设置初始延迟,但消息会自动传递。我希望消息被重新传递 5 次。那没有发生。
我正在使用以下代码创建连接工厂、连接、会话和队列:
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setMaximumRedeliveries(5);
redeliveryPolicy.setRedeliveryDelay(1000);
redeliveryPolicy.setQueue("*");
redeliveryPolicy.setBackOffMultiplier(2);
factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
factory.setRedeliveryPolicy(redeliveryPolicy);
connection = (ActiveMQConnection) factory.createConnection();
// connection.setRedeliveryPolicy(redeliveryPolicy);
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String queueName = "testQueue"+(UUID.randomUUID());
System.out.println("Creating Queue: "+queueName);
testQueue = session.createQueue(queueName);
我尝试将 RedeliveryPolicy 设置为 ActiveMQConnectionFactory 和 ActiveMQConnection(对我不起作用。)
然后我在一个消费者中做以下设置消息监听器:
MessageConsumer consumer = session.createConsumer(testQueue);
consumer.setMessageListener(new MyMessageListener(id, session));
我的消息监听器:
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Consuming["+threadId+"] message: " + text);
if(text.contains("RD")){
System.out.println("acknowledgeThisMessage");
((com.sun.messaging.jms.Message)message).acknowledgeThisMessage();
} else{
session.recover();
}
}
else{
System.out.println("Consuming["+threadId+"] Weird Message: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
如果您需要更多信息,请告诉我,我可以更新帖子。提前致谢。
【问题讨论】: