【问题标题】:Receiving number of messages from a queue从队列接收消息数
【发布时间】:2012-11-16 00:37:09
【问题描述】:

我正在接收来自 Active MQ 队列的消息。

有没有办法一次接收多条消息?还是必须通过循环来完成?

此外,如果我想要说 30 条消息运行一个过程,并且只有该过程有效,则为所有消息返回一个 message.acknowledge();

我的意思是,如果程序失败,我不想从队列中删除这 30 个。

谢谢。

【问题讨论】:

  • 为什么不把这 30 个放到一个消息映射中,然后将那个消息映射发送到队列中呢?那么你可以一次性处理,而不是等待30条消息单独到达,你不能保证订单。

标签: java activemq message-queue


【解决方案1】:

您必须循环执行。通常,最好使用消息驱动的 bean 来消费消息,但它不适合这种情况,因为它们逐条接收消息并且您无法指定确切的数量。因此,使用MessageConsumer 和手动事务:

@Resource
UserTransaction utx;

@Resource(mappedName="jms/yourConnectionFactory");
ConnectionFactory cf;

@Resource(mappedName="jms/yourQueue");
Queue queue;

..    
Connection conn = null;
Session s = null;
MessageConsumer mc = null;
try {
    utx.begin();
    conn = cf.createConnection();
    s = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); //TRANSACTIONAL SESSION!
    mc = s.createConsumer(queue);
    conn.start(); // START CONNECTION'S DELIVERY OF INCOMING MESSAGES
    for(int i=0; i<30; i++)
    {
          Message msg = mc.receive();
          //BUSINESS LOGIC
    }
    utx.commit();
} catch(Exception ex) {
  ..
} finally { //CLOSE CONNECTION, SESSION AND MESSAGE CONSUMER
}

【讨论】:

    【解决方案2】:

    我没有任何 ActiveMQ 经验。但我认为对于队列监听器,基本逻辑应该与队列实现无关。

    对于您的第一个问题,我不知道从队列中检索多条消息的任何方法。我认为最好的方法是在循环中一个一个地获取它。

    对于您的第二个问题,在读取消息的底层事务提交之前,消息不会从队列中丢弃。因此,您可以在单个事务中读取整批消息,并在出现错误时将其回滚。它不应该从队列中删除任何现有消息。

    请问您为什么需要 30 条消息来运行一个程序。通常我们使用队列的时候,每条消息都应该能够独立处理。

    【讨论】:

    • 我想要大量用于 SQL 连接优化的消息,我将这些消息放入数据库中
    • 好的。假设您的 30 条消息中有 1 条格式错误。因此,在这种情况下,您将无法处理 30 条消息中的任何一条。由于此消息引发的异常会回滚所有 30 条消息的整个事务。
    • 您的应用程序是否保证了消息的频率?假设在 10 秒内收到 29 条消息,又花了 2 分钟收到第 30 条消息(例如:请记住网络延迟是不可预测的)。前 29 条消息将在 2 分钟内不被处理。这不会影响您的应用程序的吞吐量吗?
    猜你喜欢
    • 1970-01-01
    • 2010-09-19
    • 2011-07-10
    • 2012-01-19
    • 1970-01-01
    • 2014-07-08
    • 2013-12-06
    • 1970-01-01
    • 2011-07-15
    相关资源
    最近更新 更多