【问题标题】:RabbitMQ C# verify message was sentRabbitMQ C# 验证消息已发送
【发布时间】:2014-06-10 22:12:38
【问题描述】:

我是 RabbitMQ 的新手,正在尝试写入队列并验证消息是否已发送。如果它失败了,我需要知道它。 我做了一个假队列来观察它失败,但无论我看到什么都没有异常,当我在寻找一个确认时,我总是得到一个。我从没见过 BasicNack。

我什至不确定我是否是 BasicAcks 是要走的路。

    private void button1_Click(object sender, EventArgs e)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("task_queue", true, false, false, null);

                var message = ("Helllo world");
                var body = Encoding.UTF8.GetBytes(message);
                channel.ConfirmSelect();

                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);
                properties.DeliveryMode = 2;
                channel.BasicAcks += channel_BasicAcks;
                channel.BasicNacks += channel_BasicNacks;
                //fake queue should be task_queue
                channel.BasicPublish("", "task_2queue", true, properties, body);

                channel.WaitForConfirmsOrDie();

                Console.WriteLine(" [x] Sent {0}", message);
            }
        }
    }

    void channel_BasicNacks(IModel model, BasicNackEventArgs args)
    {

    }

    void channel_BasicAcks(IModel model, BasicAckEventArgs args)
    {

    }

【问题讨论】:

    标签: c# rabbitmq


    【解决方案1】:

    对于那些寻找 C# 答案的人 - 这就是您所需要的。

    https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp

    类似这样的:(BasicAcks 附加了一个事件处理程序 - 还有 BasicNacks)

    using (var connection = FACTORY.CreateConnection())
    {
        var channel = connection.CreateModel();
        channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true);
        channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
        channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>());
         channel.BasicAcks += (sender, eventArgs) =>
                    {
                        //implement ack handle
                    };
        channel.ConfirmSelect();
    
        for (var i = 1; i <= numberOfMessages; i++)
        {
            var messageProperties = channel.CreateBasicProperties();
            messageProperties.SetPersistent(true);
    
            var message = String.Format("{0}\thello world", i);
            var payload = Encoding.Unicode.GetBytes(message);
            Console.WriteLine("Sending message: " + message);
            channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload);
            channel.WaitForConfirmsOrDie();
        }
    }
    

    【讨论】:

      【解决方案2】:

      你需要一个Publisher Confirms

      如您所见,您可以实现:

      交易:

      ch.txSelect(); <-- start transaction
      ch.basicPublish("", QUEUE_NAME,
                                  MessageProperties.PERSISTENT_BASIC,
                                  "nop".getBytes());
      ch.txCommit();<--commit transaction
      

      消息存储到队列和磁盘中。 这种方式可能会很慢,如果您需要性能,则不应使用它。

      您可以使用 Streaming Lightweight Publisher Confirms,使用:

      ch.setConfirmListener(new ConfirmListener() {
          public void handleAck(long seqNo, boolean multiple) {
              if (multiple) {
                  unconfirmedSet.headSet(seqNo+1).clear();
              } else {
                  unconfirmedSet.remove(seqNo);
              }
          }
          public void handleNack(long seqNo, boolean multiple) {
              // handle the lost messages somehow
          }
      

      希望对你有帮助

      【讨论】:

      • 轻量级更快吗?我确实需要我能得到的最好的表现。我真的不需要将它保存到 IO 而是保存在队列中。如果失败,我需要备用代码。
      • 是的,它更快。事务很慢,必须等待同步磁盘操作,并且发布被锁定,直到消息被存储。 BTW Lightweight Publisher Confirms 更加复杂,因为如果您不想丢失消息,您应该将消息完全映射为链接。
      • 我使用的是 channel.ConfirmSelect();和 channel.WaitForConfirmsOrDie(); with 被告知使用 return 事件。我能够以这种方式看到错误。那会比 ch.txSelect();/txCommit 更快吗?我将发送大量流量,例如每秒 10k,并寻求最佳性能建议。当然我不会每次都打开连接,但我只做了 1 天:)
      • WaitForConfirmsOrDie 比 txcommit 快,无论如何我建议使用 WaitForConfirmsOrDie(TIME_OUT),否则如果出现问题,您可能会永远阻止您的发布。如果您需要表演,则确认发布通常不是好的选择。因此,您可以尝试在多线程中打开更多通道并使用 WaitForConfirmsOrDie(TIME_OUT)。我认为你可以做出正确的妥协。告诉我
      • 您提供的流轻量级示例不在 c# 中,该操作正在使用的操作,您可以考虑编辑此答案并添加 c# 实现吗?
      【解决方案3】:

      好的,您总是会收到您发送的消息的 ACK,因为“每次消息都成功传递到默认 Exchange。”

      PS:您不会直接将消息发送到队列,一旦 Exchange 收到它给您 ACK 的消息,它就会使用路由键(如果有)将消息路由到所有绑定队列。

      【讨论】:

      • 这不是真的。在 vanilla AMQP 中,如果代理接受消息(请参阅basic.publish 方法文档),它不会响应任何内容。除非您使用Publish Confirms - 这就是提问者所需要的。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-14
      • 2016-12-25
      • 2016-07-13
      相关资源
      最近更新 更多