【问题标题】:Delayed message in RabbitMQRabbitMQ 中的延迟消息
【发布时间】:2011-05-25 13:24:29
【问题描述】:

是否可以通过 RabbitMQ 延迟发送消息? 例如,我想在 30 分钟后使客户端会话过期,并发送一条消息,该消息将在 30 分钟后处理。

【问题讨论】:

标签: rabbitmq


【解决方案1】:

您可以尝试两种方法:

旧方法:在每条消息/队列(策略)中设置 TTL(生存时间)标头,然后引入 DLQ 来处理。一旦 ttl 过期,您的消息将从 DLQ 移动到主队列,以便您的侦听器可以处理它。

最新方法: 最近 RabbitMQ 提出了 RabbitMQ 延迟消息插件 ,使用它您可以实现相同的功能,并且该插件支持从 RabbitMQ-3.5.8 开始提供。

您可以声明类型为 x-delayed-message 的交换,然后使用自定义标头 x-delay 发布消息,以毫秒为单位表示消息的延迟时间。消息将在 x-delay 毫秒

后传送到相应的队列
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new 
AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

更多:git

【讨论】:

  • 延迟消息插件对队列中的总延迟消息计数有限制,因为它使用 Mnesia 存储这些延迟消息 (github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72)
  • “延迟交换插件”是最优雅的解决方案,但在生产级项目中采用它之前必须小心。对于@henrylei 提到的限制,可能会丢失消息,因为消息在实际发布到交换器之前是没有冗余地存储的。对于繁重的负载和关键功能,“假”死信仍然是最负责任的替代方案
【解决方案2】:

随着 RabbitMQ v2.8 的发布,现在可以使用预定交付,但这是一个间接功能:http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

【讨论】:

  • 我尝试了这种方法,但遇到了一些问题,有什么建议吗? blog.james-carr.org/2012/03/30/…
  • 我做了一个秒杀并击中了几个showstoppers:1.消息只有DLQ:在Q顶部时(rabbitmq.com/ttl.html - 警告部分)这意味着如果我首先设置msg 1 4 小时后过期,msg2 1 小时后过期 msg2 只会在 msg1 过期后过期。 2. 消息的 TTL 由 Rabbit 保存,因此假设您使用 10 秒的短超时。如果消费者在消息过期后 10 秒内无法消费消息(由于积压),它将被丢弃并丢失 以上已通过 Rabbit 3.0.1 进行了验证 你们看到任何解决方法吗?
【解决方案3】:

感谢Norman's answer,我可以在 Node.js 中实现它。

从代码中一切都很清楚。

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});

【讨论】:

    【解决方案4】:

    由于我没有足够的声誉来添加评论,因此发布了一个新答案。这只是对http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html 已经讨论过的内容的补充

    除了在消息上设置 ttl 之外,您可以在队列级别设置它。您也可以避免仅仅为了将消息重定向到不同的队列而创建新的交换。这是示例 Java 代码:

    制作人:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.util.HashMap;
    import java.util.Map;
    
    public class DelayedProducer {
        private final static String QUEUE_NAME = "ParkingQueue";
        private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
    
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
    
            Map<String, Object> arguments = new HashMap<String, Object>();
            arguments.put("x-message-ttl", 10000);
            arguments.put("x-dead-letter-exchange", "");
            arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
            channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
    
            for (int i=0; i<5; i++) {
                String message = "This is a sample message " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("message "+i+" got published to the queue!");
                Thread.sleep(3000);
            }
    
            channel.close();
            connection.close();
        }
    }
    

    消费者:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Consumer {
       private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";
    
        public static void main(String[] args) throws Exception{
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
            boolean autoAck = false;
            channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);
    
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println(" [x] Received '" + message + "'");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }
    

    【讨论】:

    • 非常感谢。我认为你在消费者队列中有一个小错误 declare channel.queueDeclare(QUEUE_NAME, false, false, false, null);它应该有“DESTINATION_QUEUE_NAME”而不是“QUEUE_NAME”。真的非常非常感谢你
    【解决方案5】:

    看起来this blog post 描述了使用死信交换和消息 ttl 来做类似的事情。

    下面的代码使用 CoffeeScript 和 Node.js 来访问 Rabbit 并实现类似的东西。

    amqp   = require 'amqp'
    events = require 'events'
    em     = new events.EventEmitter()
    conn   = amqp.createConnection()
      
    key = "send.later.#{new Date().getTime()}"
    conn.on 'ready', ->
      conn.queue key, {
        arguments:{
          "x-dead-letter-exchange":"immediate"
        , "x-message-ttl": 5000
        , "x-expires": 6000
        }
      }, ->
        conn.publish key, {v:1}, {contentType:'application/json'}
      
      conn.exchange 'immediate'
     
      conn.queue 'right.now.queue', {
          autoDelete: false
        , durable: true
      }, (q) ->
        q.bind('immediate', 'right.now.queue')
        q.subscribe (msg, headers, deliveryInfo) ->
          console.log msg
          console.log headers
    

    【讨论】:

    • 答案中的链接已失效 - "PAGE NOT FOUND | 404".
    【解决方案6】:

    目前这是不可能的。您必须将过期时间戳存储在数据库或类似的东西中,然后有一个帮助程序读取这些时间戳并将消息排队。

    延迟消息是一项经常被请求的功能,因为它们在许多情况下都很有用。但是,如果您需要使客户端会话过期,我认为消息传递不是您的理想解决方案,而另一种方法可能会更好。

    【讨论】:

      【解决方案7】:

      假设你控制了消费者,你可以像这样实现对消费者的延迟??:

      如果我们确定队列中的第 n 条消息总是比第 n+1 条消息具有更小的延迟(这对于许多用例来说都是正确的):生产者在任务中发送 timeInformation 来传达该作业需要的时间要执行(当前时间 + 延迟)。消费者:

      1) 从任务中读取预定时间

      2) 如果 currentTime > scheduledTime 继续。

      其他延迟 = scheduleTime - currentTime

      按延迟指示的时间睡觉

      消费者总是配置有并发参数。因此,其他消息将在队列中等待,直到消费者完成工作。所以,这个解决方案可以很好地工作,虽然它看起来很尴尬,特别是对于大的时间延迟。

      【讨论】:

        【解决方案8】:

        AMQP 协议不支持延迟消息传递,但通过使用Time-To-Live and Expiration Dead Letter Exchanges 扩展延迟消息传递是可能的。该解决方案在此link 中进行了描述。我从该链接复制了以下步骤:

        一步一步:

        Declare the delayed queue
            Add the x-dead-letter-exchange argument property, and set it to the default exchange "".
            Add the x-dead-letter-routing-key argument property, and set it to the name of the destination queue.
            Add the x-message-ttl argument property, and set it to the number of milliseconds you want to delay the message.
        Subscribe to the destination queue
        

        RabbitMQ repository on GitHub 中还有一个延迟消息的插件。

        请注意,有一个名为 Celery 的解决方案通过提供一个名为 apply_async() 的调用 API 来支持 RabbitMQ 代理上的延迟任务排队。 Celery 支持 Python、node 和 PHP。

        【讨论】:

          最近更新 更多