【发布时间】:2012-03-20 17:09:59
【问题描述】:
是否有任何 API 可以在不使用监控管理工具的情况下从 JMS 队列中删除消息。
【问题讨论】:
是否有任何 API 可以在不使用监控管理工具的情况下从 JMS 队列中删除消息。
【问题讨论】:
您可能需要一个 QueueBrowser 对象。我认为它有一个删除方法(或类似的)
【讨论】:
没有用于删除消息的直接 API。您可以调用 queueReceiver.receive() 方法从队列中删除消息。 QueueBrowser 不会从队列中删除消息。
【讨论】:
这是我可以开始工作的事情:
我假设您要删除符合特定条件的消息。
Queue destination = session.createQueue("your_q");
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enum1 = browser.getEnumeration();
while(enum1.hasMoreElements())
{
TextMessage msg = (TextMessage)enum1.nextElement();
if(msg.getStringProperty("any_prop").equals("some_prop"))
{
MessageConsumer consumer = session.createConsumer(destination, "id='" + msg.getStringProperty("id") + "'");
consumer.receive(1000);
}
}
【讨论】:
这是我在 weblogic 上实际测试成功的东西:
destinationJNDI 应该包含队列的 JNDI 名称,例如“CommonJmsServer1@jms.jndi.dq.NL_Notifications.NLNotificationReprocessQ”。
ctx 应该是一个有效的 InitialContext,例如:
Properties env = new Properties();
env.put(javax.naming.Context.PROVIDER_URL, PROVIDER_URL);
env.put(Context.SECURITY_PRINCIPAL, WL_USER);
env.put(Context.SECURITY_CREDENTIALS, WL_PASSWORD);
env.put(Context.INITIAL_CONTEXT_FACTORY, WL_INITIAL_CONTEXT_FACTORY);
InitialContext ctx = new InitialContext(env);
idlist 应该是您要删除的消息 ID 的 CSV 列表
这是代码:
javax.jms.Queue queue = (javax.jms.Queue) ctx.lookup(destinationJNDI.toString());
// lookup the queue connection factory
QueueConnectionFactory queueConnFactory = (QueueConnectionFactory) ctx.lookup(WEBLOGIC_JMS_XA_CONNECTION_FACTORY);
// create a queue connection
QueueConnection queueConn = queueConnFactory.createQueueConnection();
queueConn.start();
// create a queue session
Session queueSession = queueConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
for (String id : idlist.split(",")) {
if (id.startsWith("ID:") ) {
MessageConsumer consumer = queueSession.createConsumer(queue, " JMSMessageID='" + id + "'");
Message message = consumer.receive(1000);
out.write("message = " + message + " ");
out.write("deleted ID " + id + " ");
}
}
queueSession.close();
queueConn.close();
【讨论】:
我必须另外调用session.commit() 让消费者删除消息。
此外,receivenowait API 不起作用;请致电receive(1000)。
这是我在jboss 上编写的一段工作代码:
try {
connection = connectionFactory.createConnection();
session = connection.createSession(true,-1);
Queue queue = (Queue) QueueConnectionFactory.getInitialContext().lookup("/queue/DLQ");
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enum1 = browser.getEnumeration();
while(enum1.hasMoreElements()) {
TextMessage msg = (TextMessage)enum1.nextElement();
MessageConsumer consumer = session.createConsumer(queue, "JMSMessageID='" + msg.getJMSMessageID() + "'");
//You can try starting the connection outside while loop as well, I think I started it inside while loop by mistake, but since this code worked I am hence letting you know what worked
connection.start();
Message message = consumer.receive(1000) ;
if ( message != null ) {
//do something with message
}
}
}
finally {
session.commit();
consumer.close();
browser.close();
session.close();
connection.close();
}
【讨论】:
我只想完成上面的解释,如果你使用事务,那么你需要提交和接收(1000)。如果您不使用事务处理,则在没有提交的情况下接收 nowait。
【讨论】: