【问题标题】:Stop Rabbit MQ consumer Event If the Queue is Empty如果队列为空,则停止 Rabbit MQ 消费者事件
【发布时间】:2017-11-08 13:49:37
【问题描述】:

我在 RabbitMQ 中添加了 EventingBasicConsumer 的接收事件处理程序。我正在尝试检查队列是否已被消耗(已处理并且现在为空),它应该关闭消费者和连接。我找不到可以判断队列是否已处理的条件。

请帮忙

    public void ProcessQueue(string queueName, Func<string, bool> ProcessMessage)
    {
        //lock (this.Model)
        {
            this.Model.BasicQos(0, 1, false);
            EventingBasicConsumer consumer = new EventingBasicConsumer(this.Model);

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                bool processed = ProcessMessage.Invoke(message);
                if (processed)
                    this.SendAcknowledgement(ea.DeliveryTag);
                else
                    this.StopProcessingQueue(consumer.ConsumerTag);

                // Check if no message for next 2 minutes, 
                //      Stop Consumer and close connection

            };

            this.Model.BasicConsume(queue: queueName,
                             autoAck: false,
                             consumer: consumer);
        }
    }

【问题讨论】:

标签: .net rabbitmq


【解决方案1】:

我只是创建一个被动队列来查看队列中有多少消息

 private static int passiveDeclareForMessageCount(IModel model)
    {

        Dictionary<string, object> args = new Dictionary<string, object>();
        args.Add("x-queue-mode", "lazy");
        int resultCount = 0;
        var response = model.QueueDeclarePassive(ConfigurationManager.AppSettings["LocalQueueName"].ToString());
        resultCount = (int)response.MessageCount;

        return resultCount;
    }

【讨论】:

    【解决方案2】:

    我也找不到任何属性,因此必须实现一个计时器,该计时器将在每次收到消息时重置,如果经过的时间超过 2 分钟,您可以触发一个清理方法,该方法将停止消费者并关闭连接

    【讨论】:

    • 正确,我现在用不同的方式实现了它
    【解决方案3】:

    如果队列为空,我还没有找到任何方法来停止 Rabbit MQ 消费者事件,因此我实现了以下方法,通过从 API 传递消息计数来处理消息

    "localhost:/api/queues"

    下面是处理消息直到队列为空的函数

    /// <summary>
    /// (Recommanded) Processes the queue till the number of messages provided.
    /// Added to manage the load (process batches by batches)
    /// </summary>
    /// <param name="queueName">Name of the queue.</param>
    /// <param name="ProcessMessage">The process message.</param>
    /// <param name="count">The count.</param>
    public uint ProcessQueueByMessageCount(string queueName, Func<string, bool> HowToProcessMessage, uint messageCount)
    {
        uint messagesToProcess = messageCount;
        using (var connect = this)
        {
            while (messageCount > 0)
            {
                BasicGetResult result = connect.Model.BasicGet(queueName, false);
                bool processed = HowToProcessMessage.Invoke(System.Text.Encoding.UTF8.GetString(result.Body));
                if (processed)
                {
                    this.SendAcknowledgement(result.DeliveryTag);
                    messageCount--;
                }
                else
                {
                    connect.Model.BasicNack(result.DeliveryTag, false, true);
                    break;
                }
            }
        }
        return messagesToProcess - messageCount;
    }
    

    【讨论】:

      猜你喜欢
      • 2022-07-03
      • 2016-10-25
      • 2015-02-14
      • 1970-01-01
      • 2013-08-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多