【问题标题】:Transaction management using RabbitMQ and JMS API使用 RabbitMQ 和 JMS API 进行事务管理
【发布时间】:2017-03-22 18:03:40
【问题描述】:

我正在尝试实现以下场景并使用 JMS API。使用 DefaultMessageListenerContainer 接收消息

场景 - 我有一个发布者将消息发布到 RabbitMQ 交换和一个侦听器侦听 RM​​Q 队列以接收消息。 如果在侦听器上处理接收到的消息时出现异常,则该消息或消息组应回滚并放回同一队列中。

我正在使用属性“session transacted = true”通过本地事务来实现上述场景。但是,当出现异常时,我的消息没有回滚到原始队列。如果我在这里遗漏任何东西,有人可以帮忙。

Spring bean 配置如下。

<bean id="rmqConnectionFactory" class="com.rabbitmq.jms.admin.RMQConnectionFactory">
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="host" value="localhost"/>
    <property name="port" value="5672"/>
    <property name="virtualHost" value="/"/>
</bean>

<bean id="rmqDestination" class="com.rabbitmq.jms.admin.RMQDestination">
    <property name="destinationName" value="testDestination"/>
    <property name="amqpExchangeName" value="testExchange"/>
    <property name="amqpRoutingKey" value="testRouting"/>
    <property name="amqpQueueName" value="testDestination"/>
</bean>


<bean id="messageListener" class="org.jms.SimpleJMSMessageListener" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="rmqConnectionFactory"/>
    <property name="destination" ref="rmqDestination"/>
    <property name="messageListener" ref="messageListener" />
    <property name="sessionTransacted" value="true"/>
</bean>

消息监听器如下:

public class SimpleJMSMessageListener implements MessageListener {
    @Transactional
    @Override
    public void onMessage(Message message) {

        int num[] = {1, 2, 3, 4};

        if (message instanceof TextMessage) {
            try {
                String receivedMessage = ((TextMessage) message).getText();
                logger.info("Received message via JMS message Listener-Consumer: {}", receivedMessage);
                if(receivedMessage.endsWith("35")) {
                    int i = num[5];
                    logger.info("Error processing the message");
                               }
            }
            catch (JMSException ex) {
                logger.error("error processing incoming jms message-Consumer", ex);
            }
        }
        else {
            logger.warn("Received non text jms message-Consumer {}", message);
        }
    }

}

【问题讨论】:

    标签: java spring rabbitmq jms spring-jms


    【解决方案1】:

    我们一直在寻找类似的东西,我怀疑这个问题与 Spring DefaultMessageListenerContainer 处理事务的方式有关。特别是 Spring 论坛中的这句话(此处为 http://forum.spring.io/forum/other-spring-related/remoting/24208-what-s-the-best-practice-for-using-jms-in-spring):

    关于事务管理:如果你在 DefaultMessageListenerContainer 上指定一个“transactionManager”,它将把它的接收循环包装在一个事务中。这通常与 JtaTransactionManager 一起使用,其中实际上没有绑定事务资源:它只是将线程(和侦听器的 JMS 会话)真正标记为“XA-active”并等待某些事情发生。对于任何体面的 JTA 提供者来说,这应该是非常有效的。

    这意味着,如果您在 DMLC 上指定事务管理器,那么您实际上不会有一个事务包装您的 onMessage - 即使它被注释为 @Transactional。此外,虽然使用 JtaTransactionManager 是“典型的”,但这可能是因为 JMS 事务通常是分布式的,并且需要 XA。如果您不需要 XA,我相信您也可以使用 Spring 的 JmsTranactionManager。

    因此,在您的情况下,请查看定义事务管理器 bean,并将其作为 DMLC 配置中的事务管理器引用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-06-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-08-04
      • 2016-05-17
      • 2013-07-30
      • 1970-01-01
      相关资源
      最近更新 更多