【问题标题】:sending acknowledgement from consumer to producer and handle it in activemq and rabbitmq从消费者向生产者发送确认并在activemq和rabbitmq中处理
【发布时间】:2018-04-16 10:49:35
【问题描述】:

据我所知,ActiveMQ 有一个称为 AUTO Acknowledge 的功能,它实际上通知代理已收到消息(不确认生产者)。

我想知道是否可以在 ActiveMQ 或 RabbitMQ 中从消费者向生产者发送确认。然后我想在生产者中处理确认消息,如果它没有收到确认,则再次将消息发送给消费者。

【问题讨论】:

  • 听起来要么您不信任您的 MQ,要么您正在寻找同步/阻塞调用。如果是后者,为什么不采用这种方式设计的东西,比如 HTTP?

标签: rabbitmq activemq amqp


【解决方案1】:

您想在异步介质上执行同步用例。

在 RabbitMQ 的情况下,您可以使用 RPC,如此处所述 - https://www.rabbitmq.com/tutorials/tutorial-six-python.htmlhttps://www.rabbitmq.com/direct-reply-to.html

请注意,即使是作者也建议避免使用它:

如有疑问,请避免使用 RPC。如果可以,您应该使用异步管道 - 而不是类似 RPC 的阻塞,结果被异步推送到下一个计算阶段。


RabbitMQ Java 客户端通过com.rabbitmq.client.Channel.basicConsume 提供自动确认。

【讨论】:

    【解决方案2】:

    至少对于 ActiveMQ - 这是内置的。您必须在 activemq.xml 中打开它

    <policyEntry queue=">" advisoryForConsumed="true"/>
    

    只需收听您要监控已消费消息的队列的咨询主题。然后您可以提取消息 id:s 以及不“勾选”未完成请求的内容。

    对于完整的端到端确认,我推荐一些更自定义的东西。 IE。您的生产者应用程序应该收听一些“响应”队列,该队列接收有关所生成消息状态的响应。 IE。如果处理失败 - 您可能想知道原因等等。

    无论如何,这里有一些带有生产者的代码,该生产者也侦听来自 ActiveMQ 的确认。

    public void run() throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        conn = cf.createConnection();
        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination dest = sess.createQueue("duck");
        MessageConsumer mc = sess.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(dest));
        mc.setMessageListener(this);
        conn.start();
    
        MessageProducer mp = sess.createProducer(sess.createQueue("duck"));
        mp.send(sess.createTextMessage("quack"));
    }
    
    
    public void onMessage(Message msg) {
        try {
            String msgId = msg.getStringProperty("orignalMessageId");
            System.out.println("Msg: " + msgId + " consumed");
        } catch ( Exception e) {
            e.printStackTrace();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2018-04-12
      • 2021-06-12
      • 2019-05-20
      • 2015-10-03
      • 2011-03-12
      • 2012-01-10
      • 1970-01-01
      • 2013-03-25
      • 2013-08-01
      相关资源
      最近更新 更多