【发布时间】:2017-11-08 13:49:37
【问题描述】:
我在 RabbitMQ 中添加了 EventingBasicConsumer 的接收事件处理程序。我正在尝试检查队列是否已被消耗(已处理并且现在为空),它应该关闭消费者和连接。我找不到可以判断队列是否已处理的条件。
请帮忙
public void ProcessQueue(string queueName, Func<string, bool> ProcessMessage)
{
//lock (this.Model)
{
this.Model.BasicQos(0, 1, false);
EventingBasicConsumer consumer = new EventingBasicConsumer(this.Model);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
bool processed = ProcessMessage.Invoke(message);
if (processed)
this.SendAcknowledgement(ea.DeliveryTag);
else
this.StopProcessingQueue(consumer.ConsumerTag);
// Check if no message for next 2 minutes,
// Stop Consumer and close connection
};
this.Model.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
}
}
【问题讨论】:
-
我们有一个应用程序可以监控队列长度并自动增加或减少消费者数量。也许你想要这样的东西? stackoverflow.com/questions/1038318/…