【问题标题】:Wait until a message has been received and pass the message body using C# RabbitMQ等到收到消息并使用 C# RabbitMQ 传递消息正文
【发布时间】:2017-11-16 12:59:17
【问题描述】:

我想从队列中读取消息,并且一旦可用就想将字节[]发送到消费者类之外。

public byte[] Receive()
{
    if (messagingAdapter == null)
        return default(byte[]);

    byte[] messageBody = null;
    var channel = messagingAdapter.GetChannel();
    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    using (var subscription = new Subscription(channel, containerName, false))
    {
        while (channel.IsOpen)
        {
            var success = subscription.Next(5000, out BasicDeliverEventArgs eventArgs);
            if (success == false) continue;
            messageBody = eventArgs.Body;
            channel.BasicAck(eventArgs.DeliveryTag, false);
        }
    }
    return messageBody;
}

在上面的代码中,有两个问题(可以更多)。

1) 即使在写入 prefetchCount = 1 之后,它仍然会读取所有消息。 2) 等了 5 秒后,我一直没有成功,我无法将身体送到外面。

我又写了一段代码,它做同样的事情,但在帖子本身中,写的是不推荐这样做的方式。

示例代码:

using (var signal = new ManualResetEvent(false))
{
    var consumer = new EventingBasicConsumer(channel);

    consumer.Received +=

    (sender, args) =>
    {
        messageBody = args.Body;
        signal.Set();
    };
    //// start consuming
    channel.BasicConsume(containerName, true, consumer);
    // wait until message is received or timeout reached
    bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
    // cancel subscription
    channel.BasicCancel(consumer.ConsumerTag);
    if (timeout)
    {
        // timeout reached - do what you need in this case
        throw new Exception("timeout");
    }
    return messageBody;
    // at this point messageBody is received
}

【问题讨论】:

  • 它看起来是否有相当多的消息超过响应时间。所以问题在于 BasicQos 设置。

标签: c# multithreading rabbitmq


【解决方案1】:

尝试使用channel.BasicGet 阅读,如下所示:

private byte[] ReadRabbitMsg(IModel channel, string queue)
{
    if (channel.MessageCount(queue) == 0) return null;
    BasicGetResult result = channel.BasicGet(queue, true);
    if (result == null) return null;
    else
    {
        IBasicProperties props = result.BasicProperties;
        byte[] buff = result.Body;
    }
}

【讨论】:

  • 这个我已经做了,但是想用Subscription来实现。
  • 好吧,那我一定是误解了这个问题。您的两个问题似乎是您无法逐一处理消息并且处理永远不会退出?自从我对此进行性能测试以来已经有一段时间了,但是 1-by-1 读取实际上相当快,无论如何满足我的需要。
猜你喜欢
  • 1970-01-01
  • 2015-03-23
  • 1970-01-01
  • 2020-07-28
  • 2020-02-18
  • 2014-02-17
  • 2015-10-02
  • 2021-09-27
相关资源
最近更新 更多