【发布时间】:2011-12-07 13:20:24
【问题描述】:
我需要将 JTA 事务与发送 JMS 消息同步 - MDB 应在客户端 JTA 事务提交后激活。 这在使用 XAConnectionFactory 时应该是可能的,但在我的示例中不起作用。
示例场景:
- Web 服务客户端发送代码 = 0 的消息
- mdb 接收消息并打印:START: code (NEW JTA TRANSACTION)
- mdb 增量代码并打印:SEND: %code + 1%
- mdb 发送带有新代码值的消息
- mdb 睡眠
- mdb 打印:END 代码
- mdb 完成(事务提交)
场景重复,直到代码
START: 0
SEND: 1
END: 0
START: 1
SEND: 2
END: 1
START: 2
SEND: 3
END: 2
etc..
但目前我得到:
...
START: 4
SEND: 5
END: 3
START: 5
SEND: 6
END: 4
START: 6
SEND: 7
END: 5
END: 6
我的代码:
-
网络服务客户端
@WebMethod @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) public void publish() { TestQueueUtil.sendToQueue(0); } -
TestQueueUtil(JMS 客户端)
public static void sendToQueue(Integer code) { InitialContext initialContext; XAQueueConnection queueConnection = null; XAQueueSession queueSession = null; try { Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); initialContext = new InitialContext(env); XAConnectionFactory queueConnectionFactory = (XAConnectionFactory) initialContext.lookup("jms/dsk/ConnectionFactoryXA"); queueConnection = (XAQueueConnection) queueConnectionFactory.createXAConnection(); queueConnection.start(); queueSession = queueConnection.createXAQueueSession(); Queue queue = (Queue) initialContext.lookup("jms/dsk/TestQueue"); //QueueSender sender = MessageProducer producer = queueSession.createProducer(queue); Message jmsMessage = queueSession.createMessage(); jmsMessage.setIntProperty("code", code); producer.send(jmsMessage); producer.close(); queueConnection.stop(); } catch (Exception e) { throw new RuntimeException("sendToQueue", e); } finally { if (queueSession != null) { try { queueSession.close(); } catch (Exception e) { //ignore } } if (queueConnection != null) { try { queueConnection.close(); } catch (Exception e) { //ignore } } } } -
TestQueueMDB
@MessageDriven(mappedName = "jms/dsk/TestQueue", activationConfig = { @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") }) public class TestQueueMDB implements MessageListener { @Resource protected MessageDrivenContext messageDrivenContext; @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) public void onMessage(Message message) { Integer code = null; try { code = message.getIntProperty("code"); System.out.println("START: " + code); if (code < 10) { Integer newcode = code + 1; System.out.println("SEND: " + newcode); TestQueueUtil.sendToQueue(newcode); Thread.sleep(2000); } } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("END: " + code); } } }
我做错了什么?
【问题讨论】:
标签: jakarta-ee jms jta