【发布时间】:2023-07-04 15:34:01
【问题描述】:
我有一个 .NET Core 控制台应用程序,它从 RabbitMQ 读取消息并将数据保存到数据库中。它使用 RabbitMQ.Client 程序集 5.1.0 并像这样设置 EventingConsumer:
var factory = new ConnectionFactory
{
HostName = _hostName,
UserName = _userName,
Password = _password,
RequestedHeartbeat = 20,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.BasicQos(0, prefetchCount, false);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += HandleMessage;
_consumerTag = _channel.BasicConsume(_queueName, false, consumer);
如果我在 HandleMessage 方法内的消息上调用 _channel.BasicAck,即,一旦收到每条消息,传递的消息速率约为 1500 条/秒。但是,我想等待确认消息,直到它保存到数据库中。如果我这样做,速度会下降到 300-500/秒。
保存到数据库是在单独的线程上完成的,不是瓶颈。 HandleMessage 仅将消息存储在内存中,以便稍后保存在另一个线程上。我尝试过使用从 100 到 100,000 的各种 prefetchCount 值,但这似乎并不重要。如果我分析应用程序,我可以看到 AMQP 会话线程(“WorkPool-Session#1:Connection(...)”大部分时间都在等待 RabbitMQ.Client.ConsumerWorkService+WorkPool.Loop() 中的 WaitHandle
我做错了什么?如何在不立即确认消息的情况下更快地使用消息? (服务器是RabbitMQ 3.7.7)
【问题讨论】: