【问题标题】:how to read large volume of messages from Websphere MQ如何从 Websphere MQ 读取大量消息
【发布时间】:2015-10-14 00:34:14
【问题描述】:

我想按顺序从 Websphere MQ 组中读取 10000 条消息,我正在使用下面的代码来做同样的事情,但是读取所有消息需要很长时间。即使我尝试使用多线程概念,但有时 2 个线程正在消耗相同的组并且发生竞争条件。下面是代码sn-p。 我正在尝试使用 3 个线程从 MQ 顺序读取 10000 条消息,但我的两个线程同时访问同一个组。如何避免这种情况?顺序读取大量消息的最佳方法是什么?我的要求是我想顺序阅读 10000 条消息。请帮忙。

MQConnectionFactory factory = new MQConnectionFactory();
factory.setQueueManager("QM_host")
MQQueue destination = new MQQueue("default");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

MessageConsumer lastMessageConsumer = 
    session.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE");
TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait();
lastMessageConsumer.close();

if (lastMessage != null) {

    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq");
    String groupId = lastMessage.getStringProperty("JMSXGroupID");

    boolean failed = false;

    for (int i = 1; (i < groupSize) && !failed; i++) {

        MessageConsumer consumer = session.createConsumer(destination,
            "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i);
        TextMessage message = (TextMessage)consumer.receiveNoWait();

        if (message != null) {
            System.out.println(message.getText());
        } else {
            failed = true;
        }

        consumer.close();

    }

    if (failed) {
        session.rollback();
    } else {
        System.out.println(lastMessage.getText());
        session.commit();
    }

}

connection.close();

【问题讨论】:

    标签: jms ibm-mq messaging mq


    【解决方案1】:

    我认为更好的方法是在您的应用程序中设置一个协调线程,该线程将侦听组的最后一条消息,并为每个组启动一个新线程以获取属于分配给该线程的组的消息。 (这将迎合比赛条件。)

    在获取属于一个组的消息的线程中,您不需要使用 for 循环来分别获取每条消息,而是应该获取属于该组的任何消息,同时维护一个组计数器并缓冲订单消息。只要您仅在接收和处理组的所有消息后提交会话,这将是安全的。 (这将产生更高的性能,因为每个组将由一个单独的线程处理,并且该线程只会在 MQ 中访问每条消息一次。)

    【讨论】:

      【解决方案2】:

      请参阅 IBM 在 sequential retrieval of messages 上的文档。如果页面移动或更改,我将引用最相关的部分。为了保证顺序处理,必须满足以下条件

      • 所有 put 请求均来自同一个应用程序。
      • 所有 put 请求都来自同一个工作单元,或者所有 put 请求都是在一个工作单元之外发出的。
      • 所有消息都具有相同的优先级。
      • 所有消息都具有相同的持久性。
      • 对于远程排队,配置是这样的,从应用程序发出 put 请求,通过它的 队列管理器,通过互通,到目的队列 管理器和目标队列。
      • 消息不会放入死信队列(例如,如果队列暂时已满)。
      • 获取消息的应用程序不会故意更改检索顺序,例如通过指定特定的 MsgId 或 CorrelId 或使用消息优先级。
      • 只有一个应用程序正在执行获取操作以从目标队列中检索消息。如果有多个 应用程序,这些应用程序必须设计为获得所有 发送应用程序放置的每个序列中的消息。

      虽然页面没有明确说明这一点,但当他们说“一个应用程序”时,其意思是该应用程序的单个线程。如果应用程序有并发线程,则无法保证处理顺序。

      此外,建议在单个工作单元中阅读 10,000 条消息作为保持消息顺序的方法!仅当这 10,000 条消息必须作为一个原子单元成功或失败时才这样做,这与它们是否按顺序接收无关。如果必须在单个工作单元中处理大量消息,则绝对有必要调整日志文件的大小,并且很可能还有一些其他参数。保留序列顺序对于任何线程异步消息传输来说已经足够折磨了,而不会引入运行很长时间的大量事务。

      【讨论】:

        【解决方案3】:

        您可以使用适用于 Java(非 JMS)的 MQ 类来做您想做的事,使用适用于 JMS 的 MQ 类可能会做您想做的事,但确实很棘手。

        首先从 MQ 知识中阅读此 page

        我将伪代码(来自上面的网页)转换为 Java 的 MQ 类,并将其从浏览更改为破坏性获取。

        另外,我更喜欢在同步点下处理每组消息(假设组大小合理)。

        首先,您缺少 GMO (GetMessageOptions) 的“选项”字段的几个标志,并且 MatchOptions 字段需要设置为“MQMO_MATCH_MSG_SEQ_NUMBER”,以便所有线程始终获取组中的第一条消息第一条消息。即不要像上面所说的那样为第一条消息抓取组中的第二条消息。

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        MQMessage rcvMsg = new MQMessage();
        
        /* Get the first message in a group, or a message not in a group */
        gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_SYNCPOINT;
        gmo.MatchOptions = CMQC.MQMO_MATCH_MSG_SEQ_NUMBER;
        rcvMsg.messageSequenceNumber = 1;
        inQ.get(rcvMsg, gmo);
        
        /* Examine first or only message */
         ...
        
        gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_SYNCPOINT;
        do while ((rcvMsg.messageFlags & CMQC.MQMF_MSG_IN_GROUP) == CMQC.MQMF_MSG_IN_GROUP)
        {
            rcvMsg.clearMessage();
            inQ.get(rcvMsg, gmo);
           /* Examine each remaining message in the group */
           ...
        }
        qMgr.commit();
        

        【讨论】:

          猜你喜欢
          • 2011-03-28
          • 2013-10-22
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-11-05
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多