【问题标题】:How to set timeout to RabbitMQ DefaultConsumer?如何将超时设置为 RabbitMQ DefaultConsumer?
【发布时间】:2019-11-12 01:19:08
【问题描述】:

我有一个作为 rabbitmq 生产者的应用程序。我已经应用了 RPC 方法,没有问题。生产者发布消息并在replyQueues(临时队列)中消费其响应。首先,我使用 QueueingConsumer 进行生产者消费,并且我曾经为 nextDelivery(timeout) 方法设置超时。 QueueingConsumer 现在已弃用,在 RabbitMQ 官方站点中,他们更改了 RPC 教程,并且使用 DefaultConsumer 而不是 QueueingConsumer。我也用 DefaultConsumer 替换了 QueueingConsumer。但是现在有一个问题:如何给 DefaultConsumer 设置一个超时时间?因为如果消费者没有发送任何响应,垃圾临时队列将保留在代理中。新旧生产者消费部分如下。感谢您的帮助。

老生产者消费方式:

    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        while (true) {
            QueueingConsumer.Delivery deliver = consumer.nextDelivery(timeout);
            if (deliver.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(deliver.getBody(), "UTF-8");
                break;
            }
        }

        return response;

新的生产者消费方式:

      final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (properties.getCorrelationId().equals(corrId)) {
                response.offer(new String(body, "UTF-8"));
            }
        }
    };

    channel.basicConsume(replyQueueName, true, consumer);

    return response.take();

【问题讨论】:

  • 还不知道 Java 的 BlockingQueue 接口。很棒的东西...

标签: java rabbitmq


【解决方案1】:

解决了。可以将超时设置为“响应”对象。 “新生产者消费方式”的变化如下:

响应超时:必须使用response.poll(5000, TimeUnit.MILLISECONDS) 而不是response.take()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-11-23
    • 2019-06-13
    • 1970-01-01
    • 2021-03-28
    • 2016-06-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多