【发布时间】:2012-01-24 11:56:35
【问题描述】:
我有一个用 Java 编写的单线程 ActiveMQ 消费者。我要做的就是从队列中接收()一条消息,尝试将其发送到 Web 服务,如果成功则确认()它。如果 Web 服务调用失败,我希望消息保留在队列中并在超时后重新发送。
除了重新发送部分外,它或多或少都在工作:每次我重新启动消费者时,它都会为仍在队列中的每个消息获取一条消息,但在发送失败后,这些消息永远不会重新发送。
我的代码如下:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(!client.sendMessage(msg)) {
System.out.println("Webservice call failed. Keeping message");
//message.
} else {
message.acknowledge();
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
我目前没有使用交易(也许我应该使用?)。
我确定我错过了一些简单的东西,很快就会拍我的额头,但我似乎无法弄清楚这应该如何工作。谢谢!
编辑:因为没有足够的代表,我自己无法回答:
好的,经过更多的实验,事实证明事务是做到这一点的唯一方法。这是新代码:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(client.sendMessage(msg)) {
if(transacted) {
System.out.println("Call succeeded - committing message");
session.commit();
}
//message.acknowledge();
} else {
if(transacted) {
System.out.println("Webservice call failed. Rolling back message");
session.rollback();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
现在,按照重新投递政策的规定,每 1000 毫秒重新发送一次消息。
希望这对其他人有帮助! :)
【问题讨论】:
-
确实很有帮助。谢谢!
-
哇,这救了我。显然,activemq 通常会在 10 秒左右将事情“放回队列”,除非您像以前那样指定 1。
-
对初学者很有帮助。尤其是为什么你提到
1000ms。 -
如果使用单独的确认模式(就像你原来的那样),连接必须关闭,否则 ActiveMQMessageConsumer 会认为它是重复的并自动忽略它。
-
@MartinSerrano - 谢谢!这就解释了为什么我转发的消息没有送达!