【发布时间】:2018-11-03 13:52:53
【问题描述】:
JMS 1.1 规范第 4.4.11 节说:“承认 消费消息自动确认收到所有消息 已由其会话交付。”
但是,这不是我在 Solace 中观察到的行为。我编写了下面的 100 行程序,它发送 20 条消息,然后读取消息并在确认和删除它们之间交替。结果是所有偶数编号的消息都保留在队列中。
Solace 是否违反了 JMS 规范,还是我遗漏了什么?
package com.example;
import java.util.function.Predicate;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import com.solacesystems.jms.SolConnectionFactory;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;
public class SolaceAckTest {
private static final String host = "localhost";
private static final String username = "MyUser";
private static final String password = "MyPassword";
private static final String COUNTER_PROPERTY_NAME = "MyCounter";
private static final String QUEUE_NAME = "MATCHED_1";
private static final int NUM_MESSAGES_TO_SEND = 20;
private static final long SENDING_INTERVAL_IN_MILLISECONDS = 100;
/**
* Determines on which messages we should call
* {@link Message#acknowledge()}.
*/
private static final Predicate<Message> SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE = new Predicate<Message>() {
@Override
public boolean test(Message m) {
try {
return (m.getIntProperty(COUNTER_PROPERTY_NAME) % 2) == 1;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
};
public static void main(String[] args) throws Exception {
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setRespectTTL(true);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
Destination requestDest = queueSession.createQueue(QUEUE_NAME);
queueSession.createConsumer(requestDest).setMessageListener(new MessageListenerThatAcknowledgesSomeMessages());
MessageProducer messageProducer = queueSession.createProducer(requestDest);
queueConnection.start();
for (int counter = 1; counter <= NUM_MESSAGES_TO_SEND; counter++) {
TextMessage msg = queueSession.createTextMessage();
msg.setText("Message #" + counter);
msg.setIntProperty(COUNTER_PROPERTY_NAME, counter);
messageProducer.send(msg);
Thread.sleep(SENDING_INTERVAL_IN_MILLISECONDS);
}
// Prevent the program from terminating.
Thread.sleep(1000);
}
/**
* A listener that calls {@link Message#acknowledge()} only on messages that
* meet the criteria specified by
* {@link SolaceAckTest#SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE}.
*/
private static class MessageListenerThatAcknowledgesSomeMessages implements MessageListener {
public MessageListenerThatAcknowledgesSomeMessages() {
}
@Override
public void onMessage(Message msg) {
try {
final String text = ((TextMessage) msg).getText();
if (SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE.test(msg)) {
msg.acknowledge();
System.out.println("Acknowledging message: " + text);
} else {
System.out.println("Not acknowledging message: " + text);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
【问题讨论】:
-
您是否尝试过使用真正的 JMS Session.CLIENT_ACKNOWLEDGE 会话选项而不是可能的专有 SOL 选项?
-
蒂姆,使用 Session.CLIENT_ACKNOWLEDGE 解决了这个问题。发布正式回复,以便我可以将您的回答归功于您。