【发布时间】:2017-03-22 18:03:40
【问题描述】:
我正在尝试实现以下场景并使用 JMS API。使用 DefaultMessageListenerContainer 接收消息
场景 - 我有一个发布者将消息发布到 RabbitMQ 交换和一个侦听器侦听 RMQ 队列以接收消息。 如果在侦听器上处理接收到的消息时出现异常,则该消息或消息组应回滚并放回同一队列中。
我正在使用属性“session transacted = true”通过本地事务来实现上述场景。但是,当出现异常时,我的消息没有回滚到原始队列。如果我在这里遗漏任何东西,有人可以帮忙。
Spring bean 配置如下。
<bean id="rmqConnectionFactory" class="com.rabbitmq.jms.admin.RMQConnectionFactory">
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="host" value="localhost"/>
<property name="port" value="5672"/>
<property name="virtualHost" value="/"/>
</bean>
<bean id="rmqDestination" class="com.rabbitmq.jms.admin.RMQDestination">
<property name="destinationName" value="testDestination"/>
<property name="amqpExchangeName" value="testExchange"/>
<property name="amqpRoutingKey" value="testRouting"/>
<property name="amqpQueueName" value="testDestination"/>
</bean>
<bean id="messageListener" class="org.jms.SimpleJMSMessageListener" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="rmqConnectionFactory"/>
<property name="destination" ref="rmqDestination"/>
<property name="messageListener" ref="messageListener" />
<property name="sessionTransacted" value="true"/>
</bean>
消息监听器如下:
public class SimpleJMSMessageListener implements MessageListener {
@Transactional
@Override
public void onMessage(Message message) {
int num[] = {1, 2, 3, 4};
if (message instanceof TextMessage) {
try {
String receivedMessage = ((TextMessage) message).getText();
logger.info("Received message via JMS message Listener-Consumer: {}", receivedMessage);
if(receivedMessage.endsWith("35")) {
int i = num[5];
logger.info("Error processing the message");
}
}
catch (JMSException ex) {
logger.error("error processing incoming jms message-Consumer", ex);
}
}
else {
logger.warn("Received non text jms message-Consumer {}", message);
}
}
}
【问题讨论】:
标签: java spring rabbitmq jms spring-jms