在我们平时的项目开发中很多时候都需要用到这个功能的地方,比如自动取消一些未及时进行支付的订单,还有就是一些准备自动上架的商品。(RabbitMQ下载官网:https://www.rabbitmq.com/

下面简单个例子,如下图:

C#调用RabbitMQ实现消息队列

下面是RabbitMQ的大概结构图:

C#调用RabbitMQ实现消息队列

下面是它的工作原理:

组成部分说明如下:

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布接收流程:

-----发送消息-----
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

----接收消息-----
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。

下面是实现向RabbitMQ服务器发送消息,代码如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

static void Main(string[] args)

{

    var factory = new ConnectionFactory();

    factory.HostName = "localhost";//主机名,Rabbit会拿这个IP生成一个endpoint,这个很熟悉吧,就是socket绑定的那个终结点。

    factory.UserName = "guest";//默认用户名,用户可以在服务端自定义创建,有相关命令行

    factory.Password = "guest";//默认密码

 

    using (var connection = factory.CreateConnection())//连接服务器,即正在创建终结点。

    {

        //创建一个通道,这个就是Rabbit自己定义的规则了,如果自己写消息队列,这个就可以开脑洞设计了

        //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue

        using (var channel = connection.CreateModel())

        {

             channel.QueueDeclare("kibaQueue", false, false, false, null);//创建一个名称为kibaqueue的消息队列

             var properties = channel.CreateBasicProperties();

             properties.DeliveryMode = 1;

             string message = "I am Kiba518"; //传递的消息内容

             channel.BasicPublish("", "kibaQueue", properties, Encoding.UTF8.GetBytes(message)); //生产消息

             Console.WriteLine($"Send:{message}");

        }

    }

}

运行代码。

然后我们使用命令行rabbitmqctl list_queues,去RabbitMQ的服务器查看当前消息队列,如下图:

C#调用RabbitMQ实现消息队列

可以看到,我们的消息已经发送成功了。

下面是现在我们编写接收消息代码,如下

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

static void Main(string[] args)

{

    var factory = new ConnectionFactory();

    factory.HostName = "localhost";

    factory.UserName = "guest";

    factory.Password = "guest";

 

    using (var connection = factory.CreateConnection())

    {

        using (var channel = connection.CreateModel())

        {

            channel.QueueDeclare("kibaQueue", false, false, false, null);

 

            /* 这里定义了一个消费者,用于消费服务器接受的消息

             * C#开发需要注意下这里,在一些非面向对象和面向对象比较差的语言中,是非常重视这种设计模式的。

             * 比如RabbitMQ使用了生产者与消费者模式,然后很多相关的使用文章都在拿这个生产者和消费者来表述。

             * 但是,在C#里,生产者与消费者对我们而言,根本算不上一种设计模式,他就是一种最基础的代码编写规则。

             * 所以,大家不要复杂的名词吓到,其实,并没那么复杂。

             * 这里,其实就是定义一个EventingBasicConsumer类型的对象,然后该对象有个Received事件,

             * 该事件会在服务接收到数据时触发。

             */

            var consumer = new EventingBasicConsumer(channel);//消费者

            channel.BasicConsume("kibaQueue", true, consumer);//消费消息

            consumer.Received += (model, ea) =>

            {

                var body = ea.Body;

                var message = Encoding.UTF8.GetString(body);

            };

        }

    }

}

运行代码。

然后我们使用命令行rabbitmqctl list_queues,去RabbitMQ的服务器查看当前消息队列,如下图:

C#调用RabbitMQ实现消息队列

可以看到,消息已经被使用了。

注意:如何过你安装RabbitMQ.Client版本是6.0.0.0时,在获取EventingBasicConsumer消费者数据时要使用ea.Body.ToArray()(byte[] body = ea.Body.ToArray();//获取数据)

分类:

技术点:

相关文章: