【问题标题】:RabbitMQ consumer slow when there are un-ACKed messages outstanding当有未确认的未确认消息时,RabbitMQ 消费者变慢
【发布时间】:2023-07-04 15:34:01
【问题描述】:

我有一个 .NET Core 控制台应用程序,它从 RabbitMQ 读取消息并将数据保存到数据库中。它使用 RabbitMQ.Client 程序集 5.1.0 并像这样设置 EventingConsumer:

var factory = new ConnectionFactory
{
    HostName = _hostName,
    UserName = _userName,
    Password = _password,
    RequestedHeartbeat = 20,
    AutomaticRecoveryEnabled = true,
    NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};

_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.BasicQos(0, prefetchCount, false);

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += HandleMessage;
_consumerTag = _channel.BasicConsume(_queueName, false, consumer);

如果我在 HandleMessage 方法内的消息上调用 _channel.BasicAck,即,一旦收到每条消息,传递的消息速率约为 1500 条/秒。但是,我想等待确认消息,直到它保存到数据库中。如果我这样做,速度会下降到 300-500/秒。

保存到数据库是在单独的线程上完成的,不是瓶颈。 HandleMessage 仅将消息存储在内存中,以便稍后保存在另一个线程上。我尝试过使用从 100 到 100,000 的各种 prefetchCount 值,但这似乎并不重要。如果我分析应用程序,我可以看到 AMQP 会话线程(“WorkPool-Session#1:Connection(...)”大部分时间都在等待 RabbitMQ.Client.ConsumerWorkService+WorkPool.Loop() 中的 WaitHandle

我做错了什么?如何在不立即确认消息的情况下更快地使用消息? (服务器是RabbitMQ 3.7.7)

【问题讨论】:

    标签: .net-core rabbitmq


    【解决方案1】:

    prefetchCount 限制将传递给您的消费者的未确认消息的数量。您可以增加此值,以便在不确认的情况下从队列中接收更多消息。

    但是,由于似乎数据库持久性是瓶颈,我希望交付率保持不变: 假设您需要 2 毫秒来完成每条消息的数据库插入,即 500 次插入/秒。一旦达到未完成消息的数量(预取计数),您将以 500 条消息/秒的速度进行 ACK,因此您将以该速度收到新消息。缓冲区大小对于瓶颈来说并不重要。

    为了提高系统吞吐量,您可以拥有额外的消费者,或以某种方式提高对数据库的吞吐量(即批量插入、模式改进、分片)等,但 RabbitMQ 无法坚持无限数量的未确认(进行中)消息。

    【讨论】:

    • 数据库持久化不是瓶颈。它已经批量完成,当 MQ 阅读器读取速度足够快时,可以在 ~0.2-0.3 毫秒/消息内完成。无论如何,它是在一个单独的线程上完成的,我可以设置一个非常高的完美限制(比如 100K),它不会在很长一段时间内受到影响,但传递的消息速度仍然会减慢。从本质上讲,这是我试图理解的奇怪行为:如果未确认消息,即使未达到预取限制,传递率也会低得多。
    • 我什至尝试使用具有各种值的简单 Thread.Sleep() 替换保存到数据库的代码,但这对 RabbitMQ 的交付率没有影响。速率也不是恒定的:有时会高达 1800/秒,有时会下降到 150/秒。
    • @EM0 你有没有在这个问题上取得任何进展?
    • @MNilson 否。顺便说一句,我遇到过消息被读取的速率是瓶颈的情况,正如 MA Hanin 在这里所说的那样,但我认为这并不能解释预取的情况还没有达到限制,但是没有自动确认的获取速度要比使用自动确认慢得多。