【问题标题】:Single queue: concurrent message processing with multiple consumers单队列:多消费者并发消息处理
【发布时间】:2013-06-09 08:24:53
【问题描述】:

我是 jms 的新手。目标是在异步侦听器的 onMessage 方法中同时处理来自队列的消息,方法是将侦听器实例附加到多个使用者,每个使用者使用自己的会话并在单独的线程中运行,这样消息就会传递给不同的使用者并发处理。

1) 是否可以通过创建多个消费者同时处理来自单个队列的消息? 2)我想出了下面的代码,但想了解一下下面的代码是否适合我想要完成的任务。

public class QueueConsumer implements Runnable, MessageListener {

public static void main(String[] args) {




    QueueConsumer consumer1 = new QueueConsumer();
    QueueConsumer consumer2 = new QueueConsumer();
    try {
        consumer1.init("oms", "US.Q.CHECKOUT-ORDER.1.0.JSON");
        consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");
    } catch (JMSException ex) {
        ex.printStackTrace();
        System.exit(-1);
    }


    Thread newThread1 = new Thread(consumer1);
    Thread newThread2 = new Thread(consumer1);
    newThread1.start();
    newThread2.start();



}


private static String connectionFactoryName = null;
private static String queueName = null;


private static ConnectionFactory qcf = null;
private static Connection queueConnection = null;


private Session ses = null;
private Destination queue = null;
private MessageConsumer msgConsumer = null;

public static final Logger logger = LoggerFactory
        .getLogger(QueueConsumer.class);

public QueueConsumer() {
    super();
}

public void onMessage(Message msg) {
    if (msg instanceof TextMessage) {
        try {

            //process message

        } catch (JMSException ex) {
            ex.printStackTrace();

        }
    }

}

public void run() {

    try {
        queueConnection.start();
    } catch (JMSException e) {

        e.printStackTrace();

        System.exit(-1);
    }
    while (!Thread.currentThread().isInterrupted()) {
        synchronized (this) {
            try {
                wait();
            } catch (InterruptedException ex) {
                break;
            }
        }
    }

}



public void init(String factoryName, String queue2) throws JMSException {
    try {

        qcf = new JMSConnectionFactory(factoryName);


        queueConnection = qcf.createConnection();


        ses = queueConnection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        queue = ses.createQueue(queue2);
        logger.info("Subscribing to destination: " + queue2);

        msgConsumer = ses.createConsumer(queue);


        msgConsumer.setMessageListener(this);

        System.out.println("Listening on queue " + queue2);

    } catch (Exception e) {
        e.printStackTrace();
        System.exit(-1);
    }

}

private static void setConnectionFactoryName(String name) {
    connectionFactoryName = name;
}

private static String getQueueName() {
    return queueName;
}

private static void setQueueName(String name) {
    queueName = name;
}

}

【问题讨论】:

    标签: jms


    【解决方案1】:
    1. 绝对是的
    2. 我只看了一眼,我注意到您将错误的消费者传递给您的第二个线程:

      Thread newThread2 = new Thread(consumer1); // has to pass consumer2
      

      除此之外,ConnectionFactory 等一些变量是静态的,并且会被多次初始化/覆盖。您只需要一个可以创建多个会话和/或消费者的连接。

    【讨论】:

    • 那么,如果我在主方法本身中创建一次连接工厂和连接,然后为一个消费者创建会话、消费者、侦听器一次,我猜这就是您所指的?
    • 是的,你应该也可以分享会话,相关:stackoverflow.com/questions/4741713/…
    • 由于会话中的消息是串行处理的,如果我从同一个会话实例而不是不同的会话实例创建消费者,那么消息将不会同时处理,只有一个消费者会得到因为它们属于同一个会话实例,所以一次一条消息?
    • @Dag 共享 JMS 会话不安全;只有连接和工厂可以共享,JMS 世界中的其他一切都不是线程安全的。
    • 感谢您的回复。已经修改了代码,有一些问题发到stackoverflow.com/questions/17114995/…,大家有空请多多指教。
    【解决方案2】:

    与您提供的代码示例相关,Oracle 不建议在已部署的应用程序上创建低级线程。 Weblogic 示例: Using Threads in WebLogic Server

    【讨论】:

      【解决方案3】:

      您可以在创建邮件容器 bean 的 applicationcontext.xml 中添加并发消费者属性,这将是一个更好的方法。

      <bean id="jmsMailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="concurrentConsumers">
          <value>100</value>
      </property>
      <property name="connectionFactory" ref="connectionFactory"/>
      <property name="destination" ref="mailDestination"/>
      <property name="messageListener" ref="jmsMailConsumer"/>
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2015-10-03
        • 2019-10-16
        • 1970-01-01
        • 2015-02-02
        • 2012-08-11
        • 2023-03-28
        • 2018-07-13
        • 1970-01-01
        相关资源
        最近更新 更多