【问题标题】:Why is my RabbitMQ consumer (seemingly) consuming all messages at once?为什么我的 RabbitMQ 消费者(看似)一次消费所有消息?
【发布时间】:2021-02-23 20:03:25
【问题描述】:

我正在尝试学习 RabbitMQ,它是 .NET api。我设置了两个控制台应用程序,一个是发布者,一个是消费者。这是发布者:

class Publisher
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() {HostName = "localhost"};
        using var connection = factory.CreateConnection();
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(100);
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: "My test Queue",
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish(exchange: "",
                routingKey: "My test Queue",
                basicProperties: null,
                body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

消费者:

class Consumer
{
    static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            Thread.Sleep(1000);
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            Console.WriteLine(" [x] Received {0}", message);
        };
        channel.BasicConsume(queue: "My test Queue",
            autoAck: true,
            consumer: consumer);
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

所以发布者发布了 100 条消息,每条消息之间有一个小的延迟(100 毫秒)。在消费者中,我有更大的延迟,1000 毫秒。我期望看到的是队列很快就会被填满,而消费者则在慢慢处理它,每秒处理一条消息,直到队列为空。

相反,只要我运行消费者,队列就会立即清空,并且在控制台中我看到 [x] Received {0} 每秒记录一次。

我录制了一个 GIF,你可以看到队列为 100,我启动消费者,然后队列立即为 0。但是,你可以看到消费者的动作每秒发生一次。

Consumer 是否只是完全清空了队列,并且一次处理一条消息而没有内存?如果是这样,我怎样才能让消费者在最后一个动作完成之前不要从队列中抓取另一个项目?

【问题讨论】:

  • 尝试发布一千个。消费者有一个缓冲区,它在本地拉下以提高性能。 IIRC 它默认为 100 条消息。它们位于本地队列中,并在消息被消费时被加满(如果从未处理,则返回到队列中)
  • [如果它,当你实例化它时,它很可能是通道上的一个参数......channel.basic_qos(prefetch_count=100)。我一直在使用其他语言的客户端,所以不能确定]
  • 使用这里描述的异步消费者stackoverflow.com/a/47694632/13842370
  • ... 并将Thread.Sleep(1000) 替换为await Task.Delay(1000)

标签: c# .net rabbitmq


【解决方案1】:

注意:我根本不是 RabitMQ 专家,我只是尝试过它。

我认为有多种方法可以实现这一点。

手动确认

您的示例中最小的一步可能是关闭自动确认(设置autoAck: false)。

在自动确认模式下,消息在发送后立即被视为成功传递。

使用自动确认模式时需要考虑的另一件重要事情是消费者过载。手动确认模式通常与有界通道预取一起使用,这限制了通道上未完成(“进行中”)交付的数量。然而,对于自动确认,根据定义没有这样的限制。因此,消费者可能会对交付速度感到不知所措,可能会在内存中累积积压并耗尽堆或让他们的进程被操作系统终止。一些客户端库将应用 TCP 背压(停止从套接字读取,直到未处理交付的积压下降超过某个限制)。因此,仅向能够以稳定的速度高效处理交付的消费者推荐使用自动确认模式。

来源:https://www.rabbitmq.com/confirms.html#acknowledgement-modes
您可以在此处阅读有关预取的更多信息:https://www.rabbitmq.com/confirms.html#channel-qos-prefetch

然后您需要手动确认消息的处理:

// this example assumes an existing channel (IModel) instance

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body.ToArray();
                    // positively acknowledge a single delivery, the message will
                    // be discarded
                    channel.BasicAck(ea.DeliveryTag, false);
                };
String consumerTag = channel.BasicConsume(queueName, false, consumer);

来源:https://www.rabbitmq.com/confirms.html#consumer-acks-api-elements

替代方案

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-04-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-12
    • 1970-01-01
    • 1970-01-01
    • 2016-05-04
    相关资源
    最近更新 更多