【问题标题】:What is the use case of BrokerService in ActiveMQ and how to use it correctlyActiveMQ中BrokerService的用例是什么以及如何正确使用
【发布时间】:2018-03-04 19:40:27
【问题描述】:

我是 ActiveMQ 的新手。我正在尝试通过检查 Apache 在此链接上提供的示例代码来研究和检查它是如何工作的:-

http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

    public class Server implements MessageListener {
        private static int ackMode;
        private static String messageQueueName;
        private static String messageBrokerUrl;

        private Session session;
        private boolean transacted = false;
        private MessageProducer replyProducer;
        private MessageProtocol messageProtocol;

        static {
            messageBrokerUrl = "tcp://localhost:61616";
            messageQueueName = "client.messages";
            ackMode = Session.AUTO_ACKNOWLEDGE;
        }

        public Server() {
            try {
                //This message broker is embedded
                BrokerService broker = new BrokerService();
                broker.setPersistent(false);
                broker.setUseJmx(false);
                broker.addConnector(messageBrokerUrl);
                broker.start();
            } catch (Exception e) {
                System.out.println("Exception: "+e.getMessage());
                //Handle the exception appropriately
            }

            //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
            //is ready to handle messages
            this.messageProtocol = new MessageProtocol();
            this.setupMessageQueueConsumer();
        }

        private void setupMessageQueueConsumer() {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                this.session = connection.createSession(this.transacted, ackMode);
                Destination adminQueue = this.session.createQueue(messageQueueName);

                //Setup a message producer to respond to messages from clients, we will get the destination
                //to send to from the JMSReplyTo header field from a Message
                this.replyProducer = this.session.createProducer(null);
                this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                //Set up a consumer to consume messages off of the admin queue
                MessageConsumer consumer = this.session.createConsumer(adminQueue);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public void onMessage(Message message) {
            try {
                TextMessage response = this.session.createTextMessage();
                if (message instanceof TextMessage) {
                    TextMessage txtMsg = (TextMessage) message;
                    String messageText = txtMsg.getText();
                    response.setText(this.messageProtocol.handleProtocolMessage(messageText));
                }

                //Set the correlation ID from the received message to be the correlation id of the response message
                //this lets the client identify which message this is a response to if it has more than
                //one outstanding message to the server
                response.setJMSCorrelationID(message.getJMSCorrelationID());

                //Send the response to the Destination specified by the JMSReplyTo field of the received message,
                //this is presumably a temporary queue created by the client
                this.replyProducer.send(message.getJMSReplyTo(), response);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public static void main(String[] args) {
            new Server();
        }
    }

我对 messageBrokerUrl 的困惑 = "tcp://localhost:61616";您知道 ActiveMQ 服务默认在端口 61616 上运行。为什么这个例子选择相同的端口。如果我尝试运行代码 thows eception 为: 异常:无法绑定到服务器套接字:tcp://localhost:61616 由于:java.net.BindException:地址已在使用中:JVM_Bind

也许如果我更改端口号,我可以执行代码。

请告诉我示例中为什么会这样以及如何使用 BrokerService。

【问题讨论】:

    标签: activemq broker


    【解决方案1】:

    本示例中的 BrokerService 正在尝试在内存中创建一个 ActiveMQ 代理以供示例使用。鉴于您看到的错误,我猜您已经在绑定到端口 61616 的机器上运行了一个 ActiveMQ 代理,因为这是默认端口,因此两者存在冲突。您可以停止外部代理并运行示例,也可以修改示例以不运行嵌入式代理而仅依赖您的外部代理实例。

    嵌入式代理非常适合单元测试或创建不需要用户安装和运行代理的示例。

    【讨论】:

    • 嵌入式 BrokerService 与安装的 ActiveMQ 代理?有相同或不同的功能吗?还可以提及这两个方面的任何利弊
    • 那是讨论堆栈溢出的话题,我已经回答了你的基本问题,其他信息应该在 ActiveMQ 社区中请求。
    • 我正在重新访问 ActiveMQ,需要运行一个示例以恢复速度。同样的困惑,并使用这个非常相同的 ActiveMQ 示例。对于该示例,我没有注意到的是“setupMessageQueueConsumer()”设置了 BrokerService,但如果您已经运行了 ActiveMQ,则该部分是不必要的。所以我从构造函数中删除了整个 try-catch 块,只留下两行用于消息队列使用者的设置。在事后很明显,但是这个答案使我正确地了解了我在代码中忽略的内容。 [upvoting]干杯!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多