【问题标题】:ActiveMQ Producer Multiple Queues One SessionActiveMQ Producer 多队列一个会话
【发布时间】:2017-09-18 03:29:32
【问题描述】:

如何使用 ActiveMQ 中的持久连接/会话将消息排入不同队列?

我做了什么:

public class ActiveMQProducer {

    private static final Logger LOGGER = Logger.getLogger(ActiveMQProducer.class);
    private Connection connection;
    private MessageProducer producer;
    private Session session;
    String activeMQConnection;

    public ActiveMQProducer() throws ConfigurationException, JMSException {
        activeMQConnection = ActiveMQPropertyManagerFactory.getInstance().getString("active.mq.url");
    }

    public void setupActiveMQ(String queueName) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMQConnection);
        factory.setRejectedTaskHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        connection = factory.createConnection();
        connection.start();

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(queueName);
        producer = session.createProducer(queue);

    }

    public void getConnection(String queueName) throws JMSException {

        if (connection == null || session == null) {
            Object object = new Object();
            synchronized (object) {
                setupActiveMQ(queueName);
            }
        }
    }

    public <T extends Serializable> T sendToActiveMQ(String queueName, T t) throws JMSException {
        getConnection(queueName);
        ObjectMessage message = session.createObjectMessage(t);
        producer.send(message);
        return null;
    }

    public void sendMessageToActiveMQ(String queueName, String message) throws JMSException {
        getConnection(queueName);
        TextMessage toSend = session.createTextMessage(message);
        producer.send(toSend);
    }
}

我已经意识到,通过使用它并将消息发送到不同的队列最终 ActiveMQ 会耗尽连接,因为我从未关闭连接或会话:

org.apache.activemq.transport.tcp.ExceededMaximumConnectionsException: Exceeded the maximum number of allowed client connections.

处理这个问题的正确方法是什么?我有大约 5 个队列我必须向其发送不同的消息,我是否应该打开一个新连接、入队并关闭连接,是否有办法保持会话/连接持久?

谢谢。

【问题讨论】:

    标签: java jms activemq


    【解决方案1】:

    这里有一些解决方案:

    每个目的地 1 个生产者:

    import java.io.Serializable;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.ObjectMessage;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ConfigurationException;
    import org.apache.log4j.Logger;
    
    public class ActiveMQProducer {
    
        private static final Logger LOGGER = Logger.getLogger(ActiveMQProducer.class);
        private Connection connection;
        private Session session;
        String activeMQConnection;
        Map<String, MessageProducer> producers = Collections.synchronizedMap(new HashMap<String, MessageProducer>());
        Thread shutdownHook = new Thread(new Runnable() {
            @Override
            public void run() {
                close();
            }
        });
    
        public ActiveMQProducer() throws ConfigurationException, JMSException {
            activeMQConnection = ActiveMQPropertyManagerFactory.getInstance().getString("active.mq.url");
            setupActiveMQ();
            Runtime.getRuntime().addShutdownHook(shutdownHook);
        }
    
        public void setupActiveMQ() throws JMSException {
            close();
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMQConnection);
            factory.setRejectedTaskHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }
    
        @Override
        protected void finalize() throws Throwable {
            close();
            super.finalize();
        }
    
        public void close() {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception e) {
                }
                connection = null;
            }
        }
    
        public void getConnection() throws JMSException {
            if (connection == null || session == null) {
                setupActiveMQ();
            }
        }
    
        public MessageProducer getProducer(String queueName) throws JMSException {
            getConnection();
            MessageProducer producer = producers.get(queueName);
            if (producer == null) {
                Queue queue = session.createQueue(queueName);
                producer = session.createProducer(queue);
                producers.put(queueName, producer);
            }
            return producer;
        }
    
        public <T extends Serializable> T sendToActiveMQ(String queueName, T t) throws JMSException {
            MessageProducer producer = getProducer(queueName);
            ObjectMessage message = session.createObjectMessage(t);
            producer.send(message);
            return null;
        }
    
        public void sendMessageToActiveMQ(String queueName, String message) throws JMSException {
            MessageProducer producer = getProducer(queueName);
            TextMessage toSend = session.createTextMessage(message);
            producer.send(toSend);
        }
    }
    

    所有目的地的 1 个制作人:

    import java.io.Serializable;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ConfigurationException;
    import org.apache.log4j.Logger;
    
    public class ActiveMQProducer2 {
    
        private static final Logger LOGGER = Logger.getLogger(ActiveMQProducer2.class);
        private Connection connection;
        private Session session;
        String activeMQConnection;
        Map<String, Destination> destinations = Collections.synchronizedMap(new HashMap<String, Destination>());
        private MessageProducer producer;
        Thread shutdownHook = new Thread(new Runnable() {
            @Override
            public void run() {
                close();
            }
        });
    
        public ActiveMQProducer2() throws ConfigurationException, JMSException {
            activeMQConnection = ActiveMQPropertyManagerFactory.getInstance().getString("active.mq.url");
            setupActiveMQ();
            Runtime.getRuntime().addShutdownHook(shutdownHook);
        }
    
        public void setupActiveMQ() throws JMSException {
            close();
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(activeMQConnection);
            factory.setRejectedTaskHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(session.createTemporaryQueue());
        }
    
        @Override
        protected void finalize() throws Throwable {
            close();
            super.finalize();
        }
    
        public void close() {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception e) {
                }
                connection = null;
            }
        }
    
        public void getConnection() throws JMSException {
            if (connection == null || session == null) {
                setupActiveMQ();
            }
        }
    
        public Destination getDestination(String queueName) throws JMSException {
            getConnection();
            Destination destination = destinations.get(queueName);
            if (destination == null) {
                destination = session.createQueue(queueName);
                destinations.put(queueName, destination);
            }
            return destination;
        }
    
        public <T extends Serializable> T sendToActiveMQ(String queueName, T t) throws JMSException {
            Destination destination = getDestination(queueName);
            ObjectMessage message = session.createObjectMessage(t);
            producer.send(destination, message);
            return null;
        }
    
        public void sendMessageToActiveMQ(String queueName, String message) throws JMSException {
            Destination destination = getDestination(queueName);
            TextMessage toSend = session.createTextMessage(message);
            producer.send(destination, toSend);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-09-11
      • 1970-01-01
      • 2013-01-16
      • 2011-12-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多