【问题标题】:Slow MQ Topic Subscription. Parallelization not improving performance缓慢的 MQ 主题订阅。并行化没有提高性能
【发布时间】:2017-01-19 16:57:36
【问题描述】:

我正在执行一些通配符订阅(例如 /A/# 和 /B/#)。每个订阅(请参阅下面的createSubscriber(topic))会产生大约 1000 个主题,并且需要大约 10 秒才能返回。 10 秒是合理的响应时间吗?这对我来说似乎很慢,但我没有什么可比的。

给定下面的代码;

public class JMSClientSubscriber implements Runnable {

    TopicConnection           topicCon;
    Properties                properties;
    List<MyListener>          listeners;
    JmsTopicConnectionFactory jcf;
    boolean                   connected, alive;

    public JMSClientSubscriber() throws JMSException {
            properties = Properties.getInstance();
            listeners = new LinkedList<>();
            jcf = FLOWConnectionFactory.getTopicFactory(properties, Location.CLIENT);
            connected = false;
            alive = true;
    }

    @Override
    public void run() {
            try {
                    connect();
                    while (alive) {
                            Thread.sleep(1000);
                    }
                    disconnect();
            } catch (Exception e) {
                    e.printStackTrace();
            }
    }

    void connect() throws Exception {
            connected = false;
            topicCon = jcf.createTopicConnection();

            topicCon.setExceptionListener(new ExceptionListener() {
                    @Override public void onException(JMSException arg0) {
                            disconnect();
                            try {
                                    Thread.sleep(1000);
                                    connect();
                            } catch (Exception e) {
                                    e.printStackTrace();
                            }
                    }
            });

            topicCon.start();

            for (MyListener listener: listeners) { 
                    Thread t = new Thread() {
                            @Override public void run() {
                                    TopicSession topicSes;
                                    try {
                                            topicSes = topicCon.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
                                            Topic topic = topicSes.createTopic(listener.exampleMessage.getTopicSubscription());
                                            System.out.println(new Date() + " Subscribing to " + topic);
    /* THIS TAKES 10 SECONDS! */            TopicSubscriber topicSub = topicSes.createSubscriber(topic);
                                            System.out.println(new Date() + " Subscription finished " + topic);
                                            topicSub.setMessageListener(listener);
                                    } catch (Exception e) {
                                            e.printStackTrace();
                                    }
                            }
                    };
                    t.start();
            }
            connected = true;
    }

    void disconnect() {
            try {
                    connected = false;
                    if (topicCon != null) topicCon.close();
            } catch (JMSException e) {}    
    }

    public void stop() { alive = false; }

    public class MyListener implements MessageListener {           
            Class<? extends FlowMessage>       expectedClass;
            FlowMessage                        exampleMessage;

            public MyListener(Class<? extends FlowMessage> expectedClass) throws Exception {
                    this.expectedClass = expectedClass;
                    exampleMessage = expectedClass.newInstance();
                    listeners.add(this);
            }

            @Override
            public void onMessage(javax.jms.Message arg0) {
                    BytesMessage bm = (BytesMessage) arg0;

                    try {
                            byte bytes[] = new byte[(int) bm.getBodyLength()];
                            bm.readBytes(bytes);
                            FlowMessage flowMessage = exampleMessage.newInstance(bytes);
                            System.out.println(new Date() + "[" + bm.getJMSDestination() + "] " + flowMessage.toString());

                    } catch (Exception e) {
                            e.printStackTrace();
                    }
            }
    }


    public static void main(String[] args) throws Exception {
            Properties properties = Properties.newInstance(new File("D:\\cc_views\\D253570_ALL_FLOW_DEV\\DealingRoom\\FLOW\\src\\cfg\\flow.properties"));
            LogManager.getLogManager().readConfiguration(new FileInputStream(properties.getPropertyAsFile("logging.properties")));

            /* Thread per connection */
            for (Class<FlowMessage> clazz: new Class[] { KondorCpty.class, KondorPair.class }) {
                    JMSClientSubscriber s = new JMSClientSubscriber();
                    s.new MyListener(clazz);
                    new Thread(s).start();
            }

            /* Thread per session */
            JMSClientSubscriber s = new JMSClientSubscriber();
            s.new MyListener(KondorCpty.class);
            s.new MyListener(KondorPair.class);
            new Thread(s).start();

    }

}

此代码中的main 运行两个测试;

一个连接+多线程/会话

Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:19:00 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:19:07 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:19:08 2016[topic://DRS/OW/Pair/RONGBP] KondorPair 

多线程连接+每个线程/连接一个会话

Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:22:52 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:23:00 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:23:00 2016[topic://DRS/OW/Pair/RONGBP] KondorPair

两种测试在时间和行为方面都是相同的。

  • 订阅约 1000 个主题需要约 10 秒
  • 订阅似乎是按顺序运行的,即使它们位于不同的线程中。
  • 主题更新仅在所有订阅完成后才会出现。
  • 在订阅之前或之后使用 TopicConnection.start() 不会影响性能或第一次主题更新到达时。

那么我该如何加快速度呢?

【问题讨论】:

    标签: java performance jms ibm-mq mq


    【解决方案1】:

    请注意以下几点:

    1) 对于每个 createSession(队列或主题)方法调用,数据都来自客户端和队列管理器,用于设置 JMS 会话环境。您正在使用远程连接,这意味着涉及网络上的数据流。

    2) createSubscriber 方法调用涉及创建订阅对象、除了主题查找之外的临时队列、队列管理器端的权限验证等。

    您能告诉我们您是如何并行化连接/会话的吗?

    根据 JMS 规范,会话不应跨线程共享。我会为每个订阅者指定一个线程

    1) 创建 JMS 连接

    2) 创建一个 JMS 会话

    3) 创建订阅者

    4) JMS Connection.start() 是否开始消息传递。

    【讨论】:

    • 您好,感谢您抽出宝贵时间回答。我已经包含了完整的代码。
    • 我的第一个建议是在创建订阅者并设置监听器之后移动 topicCon.start()。这样可以确保在调用 connection.start() 以告诉消息传递提供程序开始传递消息时,侦听器已准备好接收消息。
    • 我试过这个。不用找了。仍然 (a) 每个订阅需要 10 秒才能返回,并且 (b) 只有在最终订阅完成后才会收到第一个主题更新。 (仅供参考 - 我正在使用带有 Java8 的 MQClient 7.5 JAR)。
    • 我会尝试执行你的代码并在我有结果时更新
    • 嗨,我认为线程的弊大于利。我先运行TopicConnection.start(),然后在for 循环中运行createSubscriber。此处,MessageListener 正在读取第一个订阅的主题更新,而下一个 createSubscriber 正在运行。
    【解决方案2】:

    问题出在onMessage。我没有在这里处理消息,而是将消息放在BlockingQueue 上。然后,许多单独的线程轮询此BlockingQueue。这极大地提高了MessageListener 的吞吐量,并将多线程问题从 JMS/MQ 代码中移开。

    【讨论】:

      猜你喜欢
      • 2021-10-04
      • 2020-03-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-01-11
      • 1970-01-01
      相关资源
      最近更新 更多