【问题标题】:Active MQ 5.10.0 message re-delivery policy not workingActivemq 5.10.0 消息重新传递策略不起作用
【发布时间】:2014-08-13 17:40:17
【问题描述】:

我以前从未使用过activeMQ。我正在为我的项目做一些研究,我需要根据听众的逻辑重新传递消息。根据activemq's documentation,我是:

  1. 使用 Session.CLIENT_ACKNOWLEDGE 模式
  2. 根据逻辑,确认消息或执行 session.recover 以再次获取消息。

使用我当前的代码,我可以一次从 testQueue 中读取消息。 但是我的重新交付政策不起作用。我尝试设置初始延迟,但消息会自动传递。我希望消息被重新传递 5 次。那没有发生。

我正在使用以下代码创建连接工厂、连接、会话和队列:

    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(0);
    redeliveryPolicy.setMaximumRedeliveries(5);
    redeliveryPolicy.setRedeliveryDelay(1000);
    redeliveryPolicy.setQueue("*");
    redeliveryPolicy.setBackOffMultiplier(2);

    factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);     
    factory.setRedeliveryPolicy(redeliveryPolicy);

    connection = (ActiveMQConnection) factory.createConnection();       
    // connection.setRedeliveryPolicy(redeliveryPolicy);

    connection.start();
    Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    String queueName = "testQueue"+(UUID.randomUUID());
    System.out.println("Creating Queue: "+queueName);
    testQueue = session.createQueue(queueName);

我尝试将 RedeliveryPolicy 设置为 ActiveMQConnectionFactory 和 ActiveMQConnection(对我不起作用。)

然后我在一个消费者中做以下设置消息监听器:

MessageConsumer consumer = session.createConsumer(testQueue);
consumer.setMessageListener(new MyMessageListener(id, session));

我的消息监听器:

public void onMessage(Message message) {
    try {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            String text = textMessage.getText();
            System.out.println("Consuming["+threadId+"] message: " + text);


            if(text.contains("RD")){
                System.out.println("acknowledgeThisMessage");
                ((com.sun.messaging.jms.Message)message).acknowledgeThisMessage();
            } else{
                session.recover();
            }
        }
        else{
            System.out.println("Consuming["+threadId+"] Weird Message: " + message);
        }
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

如果您需要更多信息,请告诉我,我可以更新帖子。提前致谢。

【问题讨论】:

    标签: java activemq


    【解决方案1】:

    如果我在消费者内部创建侦听器,我就能让 RedeliveryPolicies 正常工作。我不确定为什么这行得通,而单独开设一个班级却行不通。我的消费者现在看起来像:

    try {           
            MessageConsumer consumer = session.createConsumer(testQueue);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            String text = textMessage.getText();
                            System.out.println("Consuming["+threadId+"] message: " + text);
    
                            if(text.contains("RD")){
                                System.out.println("acknowledgeThisMessage");
                                ((com.sun.messaging.jms.Message)message).acknowledgeThisMessage();
                            } else{
                                System.out.println("recovering the message");
                                session.recover();
                            }
                        }
                        else{
                            System.out.println("Consuming["+threadId+"] Weird Message: " + message);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    

    希望这对某人有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-07-22
      • 2017-05-02
      • 2015-06-24
      • 2014-06-28
      • 2011-01-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多