【问题标题】:Can't get ActiveMQ to resend my messages无法让 ActiveMQ 重新发送我的消息
【发布时间】:2012-01-24 11:56:35
【问题描述】:

我有一个用 Java 编写的单线程 ActiveMQ 消费者。我要做的就是从队列中接收()一条消息,尝试将其发送到 Web 服务,如果成功则确认()它。如果 Web 服务调用失败,我希望消息保留在队列中并在超时后重新发送。

除了重新发送部分外,它或多或少都在工作:每次我重新启动消费者时,它都会为仍在队列中的每个消息获取一条消息,但在发送失败后,这些消息永远不会重新发送。

我的代码如下:

public boolean init() throws JMSException, FileNotFoundException, IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setInitialRedeliveryDelay(500);
    policy.setBackOffMultiplier(2);
    policy.setUseExponentialBackOff(true);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true); // ????
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
    destination = session.createQueue(subject); //???

    consumer = session.createConsumer(destination);
    //consumer.setMessageListener(this); // message listener had same behaviour

}

private void process() {
    while(true) {
        System.out.println("Waiting...");
        try {
            Message message = consumer.receive();
            onMessage(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(!client.sendMessage(msg)) {
                System.out.println("Webservice call failed. Keeping message");
                //message.
            } else {
                message.acknowledge();
            }

            if (transacted) {
                if ((messagesReceived % batch) == 0) {
                    System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
                    session.commit();
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

我目前没有使用交易(也许我应该使用?)。

我确定我错过了一些简单的东西,很快就会拍我的额头,但我似乎无法弄清楚这应该如何工作。谢谢!


编辑:因为没有足够的代表,我自己无法回答:

好的,经过更多的实验,事实证明事务是做到这一点的唯一方法。这是新代码:

public boolean init() throws JMSException, FileNotFoundException, IOException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    RedeliveryPolicy policy = new RedeliveryPolicy();
    policy.setInitialRedeliveryDelay(1000L);
    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

    connectionFactory.setRedeliveryPolicy(policy);
    connectionFactory.setUseRetroactiveConsumer(true);
    Connection connection = connectionFactory.createConnection();

    connection.setExceptionListener(this);
    connection.start();

    session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
    destination = session.createQueue(subject);

    consumer = session.createConsumer(destination);
}

@Override
public void onMessage(Message message) {
    System.out.println("onMessage");
    messagesReceived++;

    if (message instanceof TextMessage) {
        try {
            TextMessage txtMsg = (TextMessage) message;
            String msg = txtMsg.getText();

            if(client.sendMessage(msg)) {
                if(transacted) {
                    System.out.println("Call succeeded - committing message");
                    session.commit();
                }
                //message.acknowledge();
            } else {
                if(transacted) {
                    System.out.println("Webservice call failed. Rolling back message");
                    session.rollback();
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

现在,按照重新投递政策的规定,每 1000 毫秒重新发送一次消息。

希望这对其他人有帮助! :)

【问题讨论】:

  • 确实很有帮助。谢谢!
  • 哇,这救了我。显然,activemq 通常会在 10 秒左右将事情“放回队列”,除非您像以前那样指定 1。
  • 对初学者很有帮助。尤其是为什么你提到1000ms
  • 如果使用单独的确认模式(就像你原来的那样),连接必须关闭,否则 ActiveMQMessageConsumer 会认为它是重复的并自动忽略它。
  • @MartinSerrano - 谢谢!这就解释了为什么我转发的消息没有送达!

标签: java activemq


【解决方案1】:

您不必使用事务,CLIENT_ACK/Session.recover() 也可以...

当发生以下任何情况时,消息将重新传递给客户端:

  • 使用事务处理会话并调用 rollback()。
  • 事务会话在调用提交之前关闭。
  • 会话正在使用 CLIENT_ACKNOWLEDGE 并调用了 Session.recover()。

http://activemq.apache.org/message-redelivery-and-dlq-handling.html

【讨论】:

  • 我尝试了第三点,您通过以下方式提出的建议。 Session session=connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);。在业务代码之后,我想重新发送消息到Activemq,然后我才能再次处理消息(哪个过程失败)。为此,我这样写。 session.recover();。但是在评估了这个程序之后,Activemq 没有向订阅者传递消息。有关我尝试的更多信息,请查看此Link
猜你喜欢
  • 2019-09-29
  • 1970-01-01
  • 1970-01-01
  • 2012-04-28
  • 1970-01-01
  • 2013-08-10
  • 1970-01-01
  • 2013-01-09
  • 1970-01-01
相关资源
最近更新 更多