【发布时间】:2017-11-16 12:59:17
【问题描述】:
我想从队列中读取消息,并且一旦可用就想将字节[]发送到消费者类之外。
public byte[] Receive()
{
if (messagingAdapter == null)
return default(byte[]);
byte[] messageBody = null;
var channel = messagingAdapter.GetChannel();
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
using (var subscription = new Subscription(channel, containerName, false))
{
while (channel.IsOpen)
{
var success = subscription.Next(5000, out BasicDeliverEventArgs eventArgs);
if (success == false) continue;
messageBody = eventArgs.Body;
channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
return messageBody;
}
在上面的代码中,有两个问题(可以更多)。
1) 即使在写入 prefetchCount = 1 之后,它仍然会读取所有消息。 2) 等了 5 秒后,我一直没有成功,我无法将身体送到外面。
我又写了一段代码,它做同样的事情,但在帖子本身中,写的是不推荐这样做的方式。
示例代码:
using (var signal = new ManualResetEvent(false))
{
var consumer = new EventingBasicConsumer(channel);
consumer.Received +=
(sender, args) =>
{
messageBody = args.Body;
signal.Set();
};
//// start consuming
channel.BasicConsume(containerName, true, consumer);
// wait until message is received or timeout reached
bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
// cancel subscription
channel.BasicCancel(consumer.ConsumerTag);
if (timeout)
{
// timeout reached - do what you need in this case
throw new Exception("timeout");
}
return messageBody;
// at this point messageBody is received
}
【问题讨论】:
-
它看起来是否有相当多的消息超过响应时间。所以问题在于 BasicQos 设置。
标签: c# multithreading rabbitmq