【问题标题】:ActiveMQ batch consumerActiveMQ 批量消费者
【发布时间】:2021-12-02 10:42:42
【问题描述】:

我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有一种方法/配置可以从主题中批量消费消息,而不是逐条读取消息并为每条消息进行数据库调用。

我在想象最终解决方案将执行以下操作:

  1. 以 100 个批量消耗消息
  2. 使用 mongo bulk insert 将批处理保存到 DB 中
  3. 向代理发送成功插入消息的 ACK 和失败消息的 NAK。

【问题讨论】:

    标签: java jms activemq spring-boot-actuator


    【解决方案1】:

    JMS API 仅允许您一次接收一条消息,无论是通过异步 javax.jms.MessageListener 还是通过 JMS 1.1 中的 javax.jms.MessageConsumer#receive() 或 JMS 2 中的 javax.jms.JMSConsumer.receive() 同步调用。但是,您可以批量接收多条消息使用事务处理会话。以下是the javax.jms.Session JavaDoc 对事务会话的评价:

    可以将会话指定为已处理。每个事务会话支持单个系列事务。每个事务将一组消息发送和一组消息接收组合成一个原子工作单元。实际上,事务将会话的输入消息流和输出消息流组织成一系列原子单元。当一个事务提交时,它的原子输入单元被确认并且它的相关原子输出单元被发送。如果事务回滚完成,事务发送的消息将被销毁,会话的输入会自动恢复。

    因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这基本上充当否定确认)。例如:

    final int TX_SIZE = 100;
    ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection connection = cf.createConnection();
    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Topic topic = session.createTopic("myTopic");
    MessageConsumer consumer = session.createConsumer(topic);
    connection.start();
    while (true) {
       List messages = new ArrayList<Message>();
       for (int i = 0; i < TX_SIZE; i++) {
          Message message = consumer.receive(1000);
          if (message != null) {
             messages.add(message);
          } else {
             break; // no more messages available for this batch
          }
       }
    
       if (messages.size() > 0) {
          try {
             // bulk insert data from messages List into Mongo
             session.commit();
          } catch (Exception e) {
             e.printStackTrace();
             session.rollback();
          }
       } else {
          break; // no more messages in the subscription
       }
    }
    

    值得注意的是,如果您只使用 JMS 事务会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 之后但在提交事务会话之前崩溃)。 XA 事务会为您减轻这种风险,但会增加相当多的复杂性,具体取决于您的环境。

    最后,如果您遇到 ActiveMQ“经典”的性能限制,请考虑使用来自 ActiveMQ 的下一代消息代理 ActiveMQ Artemis

    【讨论】:

      【解决方案2】:

      @Nabeel Ahmad 你可能有兴趣在 ActiveMQ 中查看Virtual Topics。它们提供了在生产者端使用主题的能力,然后使用队列进行消费。当想要扩展消费时,它们非常有用,因为使用队列比消费者方面的主题拥有更多的功能和可观察性。

      将此配置添加到activemq.xml

      <destinationInterceptors> 
        <virtualDestinationInterceptor> 
          <virtualDestinations> 
            <virtualTopic name="VT.>" prefix="VQ.*." selectorAware="false"/>   
          </virtualDestinations>
        </virtualDestinationInterceptor> 
      </destinationInterceptors>
      

      然后让生产者发送到:topic://VT.DATA

      然后让消费者接收来自:queue://VQ.CLIENT1.VT.DATA

      正如@Justin Bertram 所提到的,批量读取可以使用事务处理会话完成,并且每 100 条左右的消息提交一次。

      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
      MessageConsumer messageConsumer = session.createConsumer(session.createQueue("VQ.CLIENT1.VT.DATA");
      
      Message message = null;
      long count = 0l;
      do {
        message = messageConsumer.receive(2000l);
        
        if(message != null) {
           // check the message and publisher.send() to .DLQ if it is bad
      
           // if message is good, send to Mongo
      
           if(count % 100 == 0) {
              // commit every 100 messages on the JMS-side
              session.commit();
           }
        }
      } while(message != null);
      

      【讨论】:

        猜你喜欢
        • 2021-04-24
        • 2014-08-06
        • 1970-01-01
        • 1970-01-01
        • 2012-04-18
        • 2012-12-24
        • 1970-01-01
        • 2018-04-27
        • 2021-03-06
        相关资源
        最近更新 更多