【问题标题】:Spring JMSTemplate receive all messages in one transactionSpring JMSTemplate 在一个事务中接收所有消息
【发布时间】:2013-07-30 12:05:48
【问题描述】:

我正在尝试使用 Spring JMSTemplate.receive(String) 方法以同步模式从队列中获取所有消息。

问题是我总是只收到一条消息。代码如下:

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
  List<Message> messages = new ArrayList<Message>();
  Message message;
  while ((message = queueJmsTemplate.receive(destination)) != null) {
    messages.add(message);
  }
  return messages;
}

如果我删除 @Transactional 注释,我会收到所有消息,但所有消息都在事务之外完成,因此如果稍后在处理这些消息期间出现异常,消息将丢失。

这是我的 JMSTemplate bean 的定义。

<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="pubSubDomain" value="false" />
    <property name="receiveTimeout" value="1" />
   <property name="sessionTransacted" value="true" />
</bean>

我想要实现的是有一个事务,并且在这个事务中我想获取所有待处理的消息。

【问题讨论】:

    标签: spring transactions jms jmstemplate


    【解决方案1】:

    JmsTemplate的receive方法每次都会创建一个新的MessageConsumer。 第二次,您的事务尚未提交,Spring 将在第一次接收期间预取一些消息。 那时没有消息要提取,导致您的接听电话为空。

    Spring 中的JmsTemplate 有一个以SessionCallback 作为参数的execute 方法。这允许您针对 JmsTemplate 的底层会话运行自己的代码。 只创建一个 MessageConsumer 应该可以解决您的问题。

    @Transactional
    public List<Message> receiveAllFromQueue(String destination) {
        return jmsTemplate.execute(session -> {
            try (final MessageConsumer consumer = session.createConsumer(session.createQueue(destination))) {
                List<Message> messages = new ArrayList<>();
                Message message;
                while ((message = consumer.receiveNoWait()) != null) {
                    messages.add(message);
                }
                return messages;
            }
        }, true);
    }
    

    【讨论】:

      【解决方案2】:

      我会回复自己。看起来 JMSTemplate 不支持它。暂时解决它的唯一方法是扩展 JMSTemplate 并添加使用部分 JMSTemplate 的新方法。不幸的是,有些方法是私有的,所以它们需要被复制......

      public class CustomQueueJmsTemplate extends JmsTemplateDelegate {
      
        public List<Message> receiveAll(String destinationName) {
          return receiveAll(destinationName, null);
        }
      
        public List<Message> receiveAll(final String destinationName, final String messageSelector) {
          return execute(new SessionCallback<List<Message>>() {
            @Override
            public List<Message> doInJms(Session session) throws JMSException {
              Destination destination = resolveDestinationName(session, destinationName);
              return doReceiveAll(session, destination, messageSelector);
            }
          }, true);
        }
      
        private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector)
            throws JMSException
        {
          return doReceiveAll(session, createConsumer(session, destination, messageSelector));
        }
      
        private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException {
          try {
            // Use transaction timeout (if available).
            long timeout = getReceiveTimeout();
            JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
                .getResource(getConnectionFactory());
            if (resourceHolder != null && resourceHolder.hasTimeout()) {
              timeout = resourceHolder.getTimeToLiveInMillis();
            }
      
            // START OF MODIFIED CODE
            List<Message> messages = new ArrayList<>();
            Message message;
            while ((message = doReceive(consumer, timeout)) != null) {
              messages.add(message);
            }
            // END OF MODIFIED CODE
      
            if (session.getTransacted()) {
              // Commit necessary - but avoid commit call within a JTA transaction.
              if (isSessionLocallyTransacted(session)) {
                // Transacted session created by this template -> commit.
                JmsUtils.commitIfNecessary(session);
              }
            } else if (isClientAcknowledge(session)) {
              // Manually acknowledge message, if any.
              for (Message retrievedMessages : messages) {
                retrievedMessages.acknowledge();
              }
            }
            return messages;
          }
          finally {
            JmsUtils.closeMessageConsumer(consumer);
          }
        }
      
        private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
          if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
            return consumer.receiveNoWait();
          } else if (timeout > 0) {
            return consumer.receive(timeout);
          } else {
            return consumer.receive();
          }
        }
      
      }
      

      【讨论】:

        猜你喜欢
        • 2014-01-29
        • 2015-12-03
        • 2013-02-27
        • 1970-01-01
        • 1970-01-01
        • 2012-04-05
        • 1970-01-01
        • 2021-08-22
        • 2019-08-19
        相关资源
        最近更新 更多