【问题标题】:RabbitMQ 3.5 and Message PriorityRabbitMQ 3.5 和消息优先级
【发布时间】:2015-03-23 21:46:16
【问题描述】:

RabbitMQ 3.5 现在 supports message priority; 但是,我无法构建一个工作示例。我把我的代码放在下面。它包括我期望的输出和我实际的输出。我会对更多文档和/或工作示例感兴趣。

简而言之,我的问题是:如何在 Rabbit 3.5.0.0 中获得消息优先级?

出版商:

using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;

class Publisher
{

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                IDictionary <String , Object> args = new Dictionary<String,Object>() ;
                args.Add(" x-max-priority ", 10);
                channel.QueueDeclare("task_queue1", true, false, true, args);

                for (int i = 1 ; i<=10; i++ )
                {
                    var message = "Message";
                    var body = Encoding.UTF8.GetBytes(message + " " + i);
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.Priority = Convert.ToByte(i);
                    channel.BasicPublish("", "task_queue1", properties, body);
                }
            }
        }
    }
}

消费者:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Consumer
{ 
    class Worker
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    IDictionary<String, Object> args = new Dictionary<String, Object>();                      
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
                    channel.BasicConsume( "task_queue1", false, "", args, consumer);
                    Console.WriteLine(" [*] Waiting for messages. " +
                                      "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }
}

实际输出:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10

预期输出:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1

更新 #1。 我在 Java here 中找到了一个示例。然而,它是 Rabbit 3.4.x.x。已合并到 3.5 中的插件。 我能看到的唯一区别是它们将优先级表示为一个 int 而我的是一个字节。但我觉得这是一个红鲱鱼。我在这里有点不知所措。

【问题讨论】:

    标签: c# rabbitmq


    【解决方案1】:

    好吧,我解决了。 这是一个愚蠢的错误。 我写道:

    args.Add(" x-max-priority ", 10);
    

    应该是的

    args.Add("x-max-priority", 10);
    

    我会保留这个,以便其他人可以在 C# 中获得 Rabbitmq 3.5 的优先级队列的工作示例。

    【讨论】:

      【解决方案2】:

      Node JS 中类似 RabbitMq 优先队列的实现

      安装 amqplib

      为了测试,我们需要安装 amqplib

      npm install amqplib
      

      发布者(send.js)

      #!/usr/bin/env node
      
      var amqp = require('amqplib/callback_api');
      
      function bail(err, conn) {
        console.error(err);
        if (conn) conn.close(function() { process.exit(1); });
      }
      
      function on_connect(err, conn) {
        if (err !== null) return bail(err);
      
        // name of queue
        var q = 'hello';
        var msg = 'Hello World!';
        var priorityValue = 0;
      
        function on_channel_open(err, ch) {
          if (err !== null) return bail(err, conn);
          // maxPriority : max priority value supported by queue
          ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
            if (err !== null) return bail(err, conn);
      
            for(var index=1; index<=100; index++) {
                   priorityValue = Math.floor((Math.random() * 10));
                   msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
                   ch.publish('', q, new Buffer(msg), {priority: priorityValue});
                   console.log(" [x] Sent '%s'", msg);
            }
      
            ch.close(function() { conn.close(); });
          });
        }
      
        conn.createChannel(on_channel_open);
      }
      
      amqp.connect(on_connect);
      

      订阅者(receive.js)

      #!/usr/bin/env node
      
      var amqp = require('amqplib/callback_api');
      
      function bail(err, conn) {
        console.error(err);
        if (conn) conn.close(function() { process.exit(1); });
      }
      
      function on_connect(err, conn) {
        if (err !== null) return bail(err);
        process.once('SIGINT', function() { conn.close(); });
      
        var q = 'hello';
      
        function on_channel_open(err, ch) {
          ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
            if (err !== null) return bail(err, conn);
            ch.consume(q, function(msg) { // message callback
              console.log(" [x] Received '%s'", msg.content.toString());
            }, {noAck: true}, function(_consumeOk) { // consume callback
              console.log(' [*] Waiting for messages. To exit press CTRL+C');
            });
          });
        }
      
        conn.createChannel(on_channel_open);
      }
      
      amqp.connect(on_connect);
      

      运行:

      node send.js
      

      它将创建一个名为“hello”的队列,并使用默认的 AMQP 交换向其充斥“1000”个示例消息。

      node receive.js
      

      它将作为消费者订阅队列中等待的消息。

      【讨论】:

        【解决方案3】:

        另一种可能性(对于未来的搜索者)

        消息传递的“推送”方法似乎不尊重优先级。

        http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html

        下面是从上面的 URL 中引用的。我已将重要部分加粗。

        通过订阅检索消息(“推送 API”)

        另一种接收消息的方法是使用 IBasicConsumer 接口设置订阅。 然后,消息将在到达时自动传递,而不必主动请求。 实现消费者的一种方法是使用便利类 EventingBasicConsumer,它将传递和其他消费者生命周期事件作为 C# 分派事件:

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (ch, ea) =>
                        {
                            var body = ea.Body;
                            // ... process the message
                            ch.BasicAck(ea.DeliveryTag, false);
                        };  
        String consumerTag = channel.BasicConsume(queueName, false, consumer);
        

        通过更改为“拉”方法,优先级似乎得到了尊重。但是,在下面的引用(来自上面的同一个网址)中,似乎有一个权衡(我已经粗体

        获取单个消息(“pull API”) 要检索单个消息,请使用 IModel.BasicGet。返回值为BasicGetResult的一个实例,可以从中提取头信息(属性)和消息体:

        bool noAck = false;
        BasicGetResult result = channel.BasicGet(queueName, noAck);
        if (result == null) {
            // No message available at this time.
        } else {
            IBasicProperties props = result.BasicProperties;
            byte[] body = result.Body;
            ...
        

        由于上面的 noAck = false,您还必须调用 IModel.BasicAck 以确认您已成功接收并处理了消息:

            ...
            // acknowledge receipt of the message
            channel.BasicAck(result.DeliveryTag, false);
        }
        

        请注意,使用此 API 获取消息的效率相对较低。如果您希望 RabbitMQ 将消息推送到客户端,请参阅下一节。

        (本例中的“下一个”部分会将您带到本文顶部的“推送”方法)

        【讨论】:

          猜你喜欢
          • 2012-05-31
          • 2015-09-04
          • 1970-01-01
          • 1970-01-01
          • 2012-09-04
          • 2015-12-23
          • 2015-02-11
          • 2020-04-20
          • 2019-01-19
          相关资源
          最近更新 更多