【问题标题】:Guaranteed publishing of messages on RabbitMQ on network loss保证在网络丢失时在 RabbitMQ 上发布消息
【发布时间】:2016-12-22 08:56:32
【问题描述】:

我正在使用 RabbitMQ .NET 客户端,当网络断开连接时,我们的服务正在丢失消息。 我尝试编写一个测试应用程序并使用“BasicAcks”事件并重新发送在断开连接发生时没有得到确认的每条消息,但它仍然丢失消息。使用 ConnectionShutdown 事件检测到断开连接(查找回复代码“451”)。 为了检查收到的消息,我消费了所有消息并阅读了内容,其中至少应包含从 0 到 49999 的每个数字一次。 当网络稳定时,它工作得很好。在模拟不稳定的网络(禁用网络适配器)时,有时会丢失多达数百条消息。

代码如下:

private static ConcurrentQueue<byte[]> sendQueue = new ConcurrentQueue<byte[]>();
private static ConcurrentDictionary<ulong, byte[]> waitingForAck = new ConcurrentDictionary<ulong, byte[]>();
private static bool stop;

private static void Main()
{
    var server = "192.168.1.123";
    var userName = "rabbitmq";
    var password = "rabbitmq";
    var sendCount = 50000;
    try
    {
        Task.Run(() => Send(server, userName, password, "TestExchange", sendCount));

        Task.Run(() =>
        {
            while (true)
            {
                Console.WriteLine("Total sent:{0}", totalPackages.Count);
                Console.WriteLine("Packages waiting in send queue:{0}", sendQueue.Count);
                Console.WriteLine("Packages waiting for ack:{0}", waitingForAck.Count);
                Console.WriteLine();

                if (stop)
                {
                    break;
                }
                Thread.Sleep(1000);
            }
        });

        Console.ReadLine();

        stop = true;
    }
    catch (Exception exception)
    {
        Console.WriteLine("Exception: {0}", exception.Message);
    }

    Console.ReadLine();
}

public static void Send(string server, string userName, string password, string exchangeName, int sendCount)
{
    for (int i = 0; i < sendCount; i++)
    {
        var content = String.Format("Hello World: {0}", i);
        var data = Encoding.UTF8.GetBytes(content);
        sendQueue.Enqueue(data);
    }

    var factory = new ConnectionFactory { HostName = server, UserName = userName, Password = password };
    factory.AutomaticRecoveryEnabled = true;
    using (var connection = factory.CreateConnection())
    {
        connection.ConnectionShutdown += Connection_ConnectionShutdown;

        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);

            channel.ConfirmSelect();

            channel.BasicAcks += Channel_BasicAcks;

            while (!stop)
            {
                byte[] data;
                if (!sendQueue.TryDequeue(out data))
                {
                    Thread.Sleep(100);
                    continue;
                }
                if (data == null)
                {
                    continue;
                }
                    var publishTag = channel.NextPublishSeqNo;

                    try
                    {
                        if (!waitingForAck.TryAdd(publishTag, data))
                        {
                            Console.WriteLine("Cannot prepare {0}", publishTag);
                        }

                        channel.BasicPublish(exchangeName, string.Empty, null, data);
                        totalPackages.Enqueue(data);
                    }
                    catch (Exception)
                    {
                        byte[] ignored;
                        if (!waitingForAck.TryRemove(publishTag, out ignored))
                        {
                            Console.WriteLine("cannot delete - exception");
                        }
                        Console.WriteLine("Requeue {0}", publishTag);
                        sendQueue.Enqueue(data);
                        Thread.Sleep(1000);
                        continue;
                    }

            }

            while (waitingForAck.Count > 0)
            {
                Thread.Sleep(1000);
                Console.WriteLine("Waiting for missing acks");
            }
        }
    }            
}

private static void Channel_BasicAcks(object sender, BasicAckEventArgs e)
{
    byte[] ignored;
    if (e.Multiple)
    {
        var ids = waitingForAck.Keys.Where(x => x <= e.DeliveryTag).ToArray();
        foreach (var id in ids)
        {
            if (!waitingForAck.TryRemove(id, out ignored))
            {
                Console.WriteLine("cannot delete {0}", id);
            }
        }
    }
    else
    {
        if (!waitingForAck.TryRemove(e.DeliveryTag, out ignored))
        {
            Console.WriteLine("cannot delete {0}", e.DeliveryTag);
        }
    }
}

private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
    if (e.ReplyCode == 541)
    {
        var temp = waitingForAck.Values.ToList();
        waitingForAck.Clear();
        Console.WriteLine("Connection lost, requeue {0} messages", temp.Count);
        foreach (var message in temp)
        {
            sendQueue.Enqueue(message);
        }
    }
}

谁能告诉我我做错了什么?

【问题讨论】:

  • 发布的代码只是测试工具,而不是您的实际 RabbitMQ 代码。发布您的 RabbitMQ 代码(Send(...) 方法的内容)可能会有所帮助
  • 只需向下滚动代码,就在其中。

标签: c# .net rabbitmq


【解决方案1】:

你需要声明一个持久队列:

channel.QueueDeclare(queue: "hello", durable: true);

this 示例。

【讨论】:

  • 已经有一个持久队列绑定到交换器,接收所有消息。正如我所说,当网络连接稳定时它工作正常..
【解决方案2】:

好的,问题不在于发送部分,而在于接收部分。我在检查每条消息末尾的数字“i”时出错String.Format("Hello World: {0}", i);

上面的代码运行良好,所有消息都至少发送一次。

【讨论】:

    猜你喜欢
    • 2017-07-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-06
    • 1970-01-01
    • 2016-03-31
    • 1970-01-01
    相关资源
    最近更新 更多