【问题标题】:.NET Core 1.1 Receiving RabbitMQ Messages.NET Core 1.1 接收 RabbitMQ 消息
【发布时间】:2017-07-31 21:35:22
【问题描述】:

这是一个简单的 .net core 1.1 控制台应用程序。使用 -r 参数调用它,它会读取 rabbitmq 队列中的所有消息,使用任何其他参数调用它,每个参数作为消息入队。

这是问题所在,我可以将消息排入队列,但所有读取消息的尝试都会导致没有消息被读取。显然我没有正确使用队列,希望得到一些指导。

谢谢!

    using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMqDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = new MessagingClient();
            if (args.Length == 1 && args[0].ToLower() == "-r")
            {
                Console.WriteLine("Reading Messages from Queue.");
                var messages = client.ReceiveMessages();
                Console.WriteLine($"Read {messages.Length} message(s) from queue.");
                foreach(var msg in messages)
                    Console.WriteLine(msg);
            }
            else
            {
                foreach (var msg in args)
                {
                    client.SendMessage(msg);
                }
                Console.WriteLine($"Enqueued {args.Length} Message.");
            }
        }
    }

    internal class MessagingClient
    {
        private readonly ConnectionFactory connectionFactory;
        private string ExchangeName => "defaultExchange";
        private string RoutingKey => "";
        private string QueueName => "Demo";

        private string HostName => "localhost";


        public MessagingClient()
        {
            this.connectionFactory = new ConnectionFactory {HostName = this.HostName};
        }

        public void SendMessage(string message)
        {
            using (var connection = this.connectionFactory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    this.QueueDeclare(channel, this.QueueName);
                    var properties = this.SetMessageProperties(channel, message);

                    string messageJson = JsonConvert.SerializeObject(message);
                    var body = Encoding.UTF8.GetBytes(messageJson);

                    channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body);
                }
            }
        }

        public string[] ReceiveMessages()
        {
            var messages = new List<string>();
            using (var connection = this.connectionFactory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    this.QueueDeclare(channel, this.QueueName);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {                        
                        string bodystring = Encoding.UTF8.GetString(ea.Body);
                        messages.Add(bodystring);

                        // ReSharper disable once AccessToDisposedClosure
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer);
                }
            }
            return messages.ToArray();
        }

        private void QueueDeclare(IModel channel, string queueName)
        {
            channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct,
                durable: true,
                autoDelete: false,
                arguments: null);

            var queueDeclared = channel.QueueDeclare(queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            channel.QueueBind(queueName, ExchangeName, RoutingKey);            
        }

        private IBasicProperties SetMessageProperties(IModel channel, object message)
        {
            var properties = channel.CreateBasicProperties();
            properties.ContentType = "application/json";
            properties.Persistent = true;
            return properties;
        }

    }
}

【问题讨论】:

    标签: rabbitmq .net-core


    【解决方案1】:
    • 首先,使用管理 UI 确保您的交换和队列设置正确并且消息已发布到其中。
    • 其次,ReceiveMessages(),因此您的阅读器可能会在事件有机会触发之前立即返回一个空数组。当消费者从 RabbitMQ 接收消息时,您无需等待任何代码。请注意the tutorial 中的Console.ReadLine() 是如何使用的。在您的示例中,您可以使用同步对象 (ManualResetEvent) 来防止 ReceiveMessages() 在读取特定消息计数之前返回。

    【讨论】:

    • 伙计,我觉得自己很愚蠢。当然它不会阻塞正在执行的线程。我添加了一些阻塞逻辑,它就像一个冠军。谢谢卢克!
    • 很高兴我能帮忙?
    猜你喜欢
    • 2017-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多