【发布时间】:2021-02-23 20:03:25
【问题描述】:
我正在尝试学习 RabbitMQ,它是 .NET api。我设置了两个控制台应用程序,一个是发布者,一个是消费者。这是发布者:
class Publisher
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() {HostName = "localhost"};
using var connection = factory.CreateConnection();
for (int i = 0; i < 100; i++)
{
Thread.Sleep(100);
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "My test Queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "My test Queue",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
消费者:
class Consumer
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
Thread.Sleep(1000);
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: "My test Queue",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
所以发布者发布了 100 条消息,每条消息之间有一个小的延迟(100 毫秒)。在消费者中,我有更大的延迟,1000 毫秒。我期望看到的是队列很快就会被填满,而消费者则在慢慢处理它,每秒处理一条消息,直到队列为空。
相反,只要我运行消费者,队列就会立即清空,并且在控制台中我看到 [x] Received {0} 每秒记录一次。
我录制了一个 GIF,你可以看到队列为 100,我启动消费者,然后队列立即为 0。但是,你可以看到消费者的动作每秒发生一次。
Consumer 是否只是完全清空了队列,并且一次处理一条消息而没有内存?如果是这样,我怎样才能让消费者在最后一个动作完成之前不要从队列中抓取另一个项目?
【问题讨论】:
-
尝试发布一千个。消费者有一个缓冲区,它在本地拉下以提高性能。 IIRC 它默认为 100 条消息。它们位于本地队列中,并在消息被消费时被加满(如果从未处理,则返回到队列中)
-
[如果它是,当你实例化它时,它很可能是通道上的一个参数......
channel.basic_qos(prefetch_count=100)。我一直在使用其他语言的客户端,所以不能确定] -
使用这里描述的异步消费者stackoverflow.com/a/47694632/13842370
-
... 并将
Thread.Sleep(1000)替换为await Task.Delay(1000)