【问题标题】:JMS - Going from one to multiple consumersJMS - 从一个消费者到多个消费者
【发布时间】:2011-06-04 16:25:02
【问题描述】:

我有一个 JMS 客户端,它正在生成消息并通过 JMS 队列发送给它的唯一消费者。

我想要的是不止一个消费者收到这些消息。我首先想到的是将队列转换为主题,以便当前和新的消费者可以订阅并将相同的消息传递给他们。

这显然将涉及在生产者和消费者方面修改当前客户端代码。

我还想看看其他选项,例如创建第二个队列,这样我就不必修改现有的消费者。我相信这种方法有一些优点,比如(如果我错了,请纠正我)平衡两个不同队列而不是一个队列之间的负载,这可能会对性能产生积极影响。

我想就您可能会看到的这些选项和缺点/优点获得建议。非常感谢任何反馈。

【问题讨论】:

    标签: java jms message-queue messaging


    【解决方案1】:

    正如你所说,你有几个选择。

    如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者。如果您的消费者不活着,队列提供的一件事是持久性。这取决于您使用的 MQ 系统。

    如果您想坚持使用队列,您将为每个消费者创建一个队列,并创建一个侦听原始队列的调度程序。

    Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                             -> Queue_Consumer_2 <- Consumer_2
                                             -> Queue_Consumer_3 <- Consumer_3
    

    主题的优点

    • 更容易动态添加新的消费者。所有消费者无需任何工作即可收到新消息。
    • 您可以创建循环主题,这样 Consumer_1 将收到一条消息,然后是 Consumer_2,然后是 Consumer_3
    • 可以向消费者推送新消息,而不必查询队列以使其反应。

    主题的缺点

    • 除非您的代理支持此配置,否则消息不会持久化。如果消费者离线并回来,则可能会丢失消息,除非设置了持久消费者。
    • 难以让 Consumer_1 和 Consumer_2 接收消息,但不能让 Consumer_3 接收消息。使用 Dispatcher 和 Queues,Dispatcher 无法将消息放入 Consumer_3 的队列中。

    队列的优点

    • 在消费者删除它们之前,消息会一直存在
    • 调度程序可以通过不将消息放入相应的消费者队列来过滤哪些消费者获得哪些消息。不过,这可以通过过滤器处理主题。

    队列的缺点

    • 需要创建额外的队列来支持多个消费者。在动态环境中,这不会是有效的。

    在开发消息传递系统时,我更喜欢主题,因为它给了我最大的权力,但是鉴于您已经在使用队列,它需要您更改系统的工作方式来实现主题。

    多消费者队列系统的设计与实现

    Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                             -> Queue_Consumer_2 <- Consumer_2
                                             -> Queue_Consumer_3 <- Consumer_3
    

    来源

    请记住,您还需要处理其他一些事情,例如问题异常处理、重新连接到连接以及在失去连接时排队等。这只是为了让您了解如何处理完成我所描述的。

    在真实系统中,我可能不会在第一个异常时退出。我会允许系统继续尽其所能地运行并记录错误。在这段代码中,如果将消息放入单个消费者队列失败,整个调度程序将停止。

    Dispatcher.java

    /*
     * To change this template, choose Tools | Templates
     * and open the template in the editor.
     */
    package stackoverflow_4615895;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    
    public class Dispatcher {
    
        private static long QUEUE_WAIT_TIME = 1000;
        private boolean mStop = false;
        private QueueConnectionFactory mFactory;
        private String mSourceQueueName;
        private String[] mConsumerQueueNames;
    
        /**
         * Create a dispatcher
         * @param factory
         *      The QueueConnectionFactory in which new connections, session, and consumers
         *      will be created. This is needed to ensure the connection is associated
         *      with the correct thread.
         * @param source
         *
         * @param consumerQueues
         */
        public Dispatcher(
            QueueConnectionFactory factory, 
            String sourceQueue, 
            String[] consumerQueues) {
    
            mFactory = factory;
            mSourceQueueName = sourceQueue;
            mConsumerQueueNames = consumerQueues;
        }
    
        public void start() {
            Thread thread = new Thread(new Runnable() {
    
                public void run() {
                    Dispatcher.this.run();
                }
            });
            thread.setName("Queue Dispatcher");
            thread.start();
        }
    
        public void stop() {
            mStop = true;
        }
    
        private void run() {
    
            QueueConnection connection = null;
            MessageProducer producer = null;
            MessageConsumer consumer = null;
            QueueSession session = null;
            try {
                // Setup connection and queues for receiving the messages
                connection = mFactory.createQueueConnection();
                session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
                Queue sourceQueue = session.createQueue(mSourceQueueName);
                consumer = session.createConsumer(sourceQueue);
    
                // Create a null producer allowing us to send messages
                // to any queue.
                producer = session.createProducer(null);
    
                // Create the destination queues based on the consumer names we
                // were given.
                Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
                for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                    destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
                }
    
                connection.start();
    
                while (!mStop) {
    
                    // Only wait QUEUE_WAIT_TIME in order to give
                    // the dispatcher a chance to see if it should
                    // quit
                    Message m = consumer.receive(QUEUE_WAIT_TIME);
                    if (m == null) {
                        continue;
                    }
    
                    // Take the message we received and put
                    // it in each of the consumers destination
                    // queues for them to process
                    for (Queue q : destinationQueues) {
                        producer.send(q, m);
                    }
                }
    
            } catch (JMSException ex) {
                // Do wonderful things here 
            } finally {
                if (producer != null) {
                    try {
                        producer.close();
                    } catch (JMSException ex) {
                    }
                }
                if (consumer != null) {
                    try {
                        consumer.close();
                    } catch (JMSException ex) {
                    }
                }
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException ex) {
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException ex) {
                    }
                }
            }
        }
    }
    

    Main.java

        QueueConnectionFactory factory = ...;
    
        Dispatcher dispatcher =
                new Dispatcher(
                factory,
                "Queue_Original",
                new String[]{
                    "Consumer_Queue_1",
                    "Consumer_Queue_2",
                    "Consumer_Queue_3"});
        dispatcher.start();
    

    【讨论】:

    • 这是一个很好的答案。我正在使用 JBoss 的 MOM 实现,即 HornetQ。
    • @Anonimo 上次我检查 JBoss 绝对遵守 JMS 规范。这在过去给我带来了一些烦恼,因为我动态地创建了 JMS 规范没有考虑的主题。其他像 ActiveMQ 允许您动态创建主题,它只需要在 JBoss 中更改 1 行代码即可实现相同的功能。
    • 谢谢安德鲁。您能否详细说明使用多个队列的想法?根据您的解释,生产者代码没有改变,但不确定该调度程序的位置以及它在技术术语中的表示方式。
    • @Anomimo 是的,我会尽快发布。
    【解决方案2】:

    您可能不必修改代码;这取决于你是怎么写的。

    例如,如果您的代码使用MessageProducer 而不是QueueSender 发送消息,那么它将适用于主题和队列。同样,如果您使用MessageConsumer 而不是QueueReceiver

    本质上,在 JMS 应用程序中使用非特定接口与 JMS 系统交互是一种很好的做法,例如MessageProducerMessageConsumerDestination 等。如果是这样,那就是“仅仅" 配置问题。

    【讨论】:

    • 那将是一个不错的选择。不幸的是,我们正在使用特定的接口,例如 QueueSender。如果我们重构,我肯定会记住这一点。
    猜你喜欢
    • 2018-03-07
    • 2012-07-14
    • 2021-05-14
    • 2019-06-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-12
    • 2020-09-12
    相关资源
    最近更新 更多