【问题标题】:RabbitMQ QueueingConsumer possible memory leakRabbitMQ QueueingConsumer 可能存在内存泄漏
【发布时间】:2012-09-23 03:13:05
【问题描述】:

我有以下代码来声明一个队列:

Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);

以及以下获取下一个 Delivery 对象并对其进行处理:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());

            process(queue);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        }
    }

反序列化代码。如您所见,我正在使用 Kryo:

public T deserialise(byte[] body) {
    Kryo kryo= new Kryo();
    Input input = new Input(body);
    T deserialised = kryo.readObject(input, getQueueClass());
    input.close();

    return deserialised;
}

如果我使用包含大量对象的队列运行此程序,则在大约 270 万个对象之后,我会遇到内存不足异常。我最初是通过在夜间运行它来发现这一点的,数据从 JMeter 以大约 90/s 的速度输入,起初它毫无问题地消耗,但早上我注意到 RabbitMQ 中有大量数据并且内存不足异常消费者。我再次运行它并使用 Eclipse Memory Analyzer 来确定该内存的使用位置。从这里我可以看到 com.rabbitmq.client.QueueingConsumer 引用的 java.util.concurrent.LinkedBlockingQueue 越来越大,直到内存不足。

我需要做些什么来告诉 Rabbit 释放资源吗?

我可以增加堆大小,但我担心这只是一个短期修复,我的代码中可能存在一些问题,可能会在生产部署几个月后出现内存泄漏。

【问题讨论】:

    标签: java memory-leaks jms rabbitmq


    【解决方案1】:

    问题似乎是您的消费者无法跟上您的生产者,导致您的队列无限制地增长。您需要限制队列的大小并在达到限制时减慢生产者的速度。我还会考虑优化您的消费者,使其跟上。

    【讨论】:

    • 维持一段时间就好了;至少在我观看的第一个小时内,消息以约 120/s 的速度到达并立即被消耗。当我一夜之间运行这个程序时,早上我有 434 万条消息在 Rabbit 中没有被消费。因此,我重新启动了我的消费者,它以超过 5000/s 的速度消耗,然后在消耗了大约 270 万条消息后内存不足。消费者似乎可以跟得上,但内存不足,因为 QueueingConsumer 内的 LinkedBlockingQueue 增长太快。
    • 如果队列在增长,那么消费者不可能跟上生产者的步伐。消费者开始时可能足够快,但随着时间的推移会变慢。
    • 啊,现在我想知道当我看到它正在消耗时,它实际上只是将消息放入 LinkedBlockingQueue 上的内存中,这并不意味着它正在被正确消耗。这可能是有道理的。
    • 将任务添加到队列的唯一自然方法是将其从队列中移除或移除。
    • 顺便说一句,这只是我测试生产中不应该发生的极端情况。我们不太可能以我一直在测试的速度获得数据,而且肯定不会持续很长时间。
    【解决方案2】:

    这可能是对象被消耗后没有被销毁的问题。你能显示反序列化的代码吗?我怀疑您正在通过队列发送对象并使用某种对象输入流/字节数组输入流对它们进行反序列化。如果您没有正确关闭可能导致内存泄漏的流。

    【讨论】:

    • 我已将反序列化代码添加到我的问题中。反序列化代码使用 Kryo。我一直在使用 Eclipse 内存分析器,99% 的内存都被 LinkedBlockingQueue 消耗,而且一直在增长。这被 QueueingConsumer 引用。
    【解决方案3】:

    解决办法是设置basicQos - channel.basicQos(2);。我的频道声明现在如下所示:

            Connection connection = RabbitConnection.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(getQueueName(), false, false, false, null);
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(getQueueName(), true,consumer);
            channel.basicQos(2);
    

    将 basicQos 设置为 2 意味着仅在内部存储器中保留 2 条消息。有关使用 CoDel 算法的更多信息和有趣的讨论,请参阅http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/

    【讨论】:

    • 根据我刚刚阅读的内容,如果这解决了您在原始代码中消耗速度不够快的问题,并且您的缓冲区因队列中读取但尚未处理的消息而变得非常大。将 QOS 更改为 2 意味着仅缓冲 2 条消息,队列将缓冲其余消息。在这种情况下,您的队列不是变得很大而不是您的内存使用量很大吗?
    • 刚刚测试了这个,内存还在增长,队列没有变大。这似乎没有做任何事情。我不介意队列变大,因为我可以添加更多消费者,而且我将消息放入队列的速率大约是预期速率的 30 倍。我只是担心如果消费者发生内存泄漏,那么我会在生产一个月后发现;可能在我度假的时候:/
    • 我很确定这是一个 GC 问题。对数据的一些引用被保留,并且内存使用量随着每条传入消息而增长。
    【解决方案4】:

    我的错误是我将频道设置为自动确认。这意味着来自 Rabbit 的每条消息都得到了确认(确认为已收到)。我通过将通道声明为不自动确认来修复(并测试)这个问题:channel.basicConsume(getQueueName(), false,consumer);,在处理队列之后,我确认消息:consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    这就是我的队列声明现在的样子:

            Connection connection = RabbitConnection.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(getQueueName(), false, false, false, null);
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(getQueueName(), false,consumer);
    

    以及以下处理队列:

        Delivery delivery = null;
        T queue = null;
    
        //loop over, continuously retrieving messages
        while(true) {
    
            try {
                delivery = consumer.nextDelivery();
                queue = deserialise(delivery.getBody());
                process(queue);
                consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    
            } catch (ShutdownSignalException e) {
                logger.warn("Shutodwon signal received.");
                break;
            } catch (ConsumerCancelledException e) {
                logger.warn("Consumer cancelled exception: {}",e.getMessage());
                break;
            } catch (InterruptedException e) {
                logger.warn("Interuption exception: {}", e);
                break;
            } catch (IOException e) {
                logger.error("Could not ack message: {}",e);
                break;
            }
        }
    

    我现在可以在 RabbitMQ 管理屏幕中看到消息正在以非常高的速率传递,但它们并没有以该速率得到确认。如果我然后杀死我的消费者,在大约 30 秒内,所有那些未确认的消息都将移回就绪队列。我将进行的改进之一是设置 basicQos 值:channel.basicQos(10);,这样就不会传递太多消息但未确认。这是可取的,因为这意味着我可以在同一个队列中启动另一个消费者并开始处理队列,而不是最终在内存中未确认且对其他消费者不可用。

    【讨论】:

    • @robthewolf 如果我没记错的话,这解决了内存泄漏问题,因为没有将所有消息发送给消费者,消费者将它们存储在内存映射中,消耗越来越多的内存。通过将通道设置为自动确认,一次只有一条消息保存在 JVM 内存中,其余消息存储在 Rabbit Queue 中。除了解决内存泄漏之外,这还可以添加更多的消费者来处理负载,当他们完成处理前一个消息时,他们将拉取消息。
    • 您说“将频道设置为自动确认”是指您将其设置为不自动确认
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-12
    • 2011-07-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-03
    相关资源
    最近更新 更多