【发布时间】:2016-12-22 08:56:32
【问题描述】:
我正在使用 RabbitMQ .NET 客户端,当网络断开连接时,我们的服务正在丢失消息。 我尝试编写一个测试应用程序并使用“BasicAcks”事件并重新发送在断开连接发生时没有得到确认的每条消息,但它仍然丢失消息。使用 ConnectionShutdown 事件检测到断开连接(查找回复代码“451”)。 为了检查收到的消息,我消费了所有消息并阅读了内容,其中至少应包含从 0 到 49999 的每个数字一次。 当网络稳定时,它工作得很好。在模拟不稳定的网络(禁用网络适配器)时,有时会丢失多达数百条消息。
代码如下:
private static ConcurrentQueue<byte[]> sendQueue = new ConcurrentQueue<byte[]>();
private static ConcurrentDictionary<ulong, byte[]> waitingForAck = new ConcurrentDictionary<ulong, byte[]>();
private static bool stop;
private static void Main()
{
var server = "192.168.1.123";
var userName = "rabbitmq";
var password = "rabbitmq";
var sendCount = 50000;
try
{
Task.Run(() => Send(server, userName, password, "TestExchange", sendCount));
Task.Run(() =>
{
while (true)
{
Console.WriteLine("Total sent:{0}", totalPackages.Count);
Console.WriteLine("Packages waiting in send queue:{0}", sendQueue.Count);
Console.WriteLine("Packages waiting for ack:{0}", waitingForAck.Count);
Console.WriteLine();
if (stop)
{
break;
}
Thread.Sleep(1000);
}
});
Console.ReadLine();
stop = true;
}
catch (Exception exception)
{
Console.WriteLine("Exception: {0}", exception.Message);
}
Console.ReadLine();
}
public static void Send(string server, string userName, string password, string exchangeName, int sendCount)
{
for (int i = 0; i < sendCount; i++)
{
var content = String.Format("Hello World: {0}", i);
var data = Encoding.UTF8.GetBytes(content);
sendQueue.Enqueue(data);
}
var factory = new ConnectionFactory { HostName = server, UserName = userName, Password = password };
factory.AutomaticRecoveryEnabled = true;
using (var connection = factory.CreateConnection())
{
connection.ConnectionShutdown += Connection_ConnectionShutdown;
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null);
channel.ConfirmSelect();
channel.BasicAcks += Channel_BasicAcks;
while (!stop)
{
byte[] data;
if (!sendQueue.TryDequeue(out data))
{
Thread.Sleep(100);
continue;
}
if (data == null)
{
continue;
}
var publishTag = channel.NextPublishSeqNo;
try
{
if (!waitingForAck.TryAdd(publishTag, data))
{
Console.WriteLine("Cannot prepare {0}", publishTag);
}
channel.BasicPublish(exchangeName, string.Empty, null, data);
totalPackages.Enqueue(data);
}
catch (Exception)
{
byte[] ignored;
if (!waitingForAck.TryRemove(publishTag, out ignored))
{
Console.WriteLine("cannot delete - exception");
}
Console.WriteLine("Requeue {0}", publishTag);
sendQueue.Enqueue(data);
Thread.Sleep(1000);
continue;
}
}
while (waitingForAck.Count > 0)
{
Thread.Sleep(1000);
Console.WriteLine("Waiting for missing acks");
}
}
}
}
private static void Channel_BasicAcks(object sender, BasicAckEventArgs e)
{
byte[] ignored;
if (e.Multiple)
{
var ids = waitingForAck.Keys.Where(x => x <= e.DeliveryTag).ToArray();
foreach (var id in ids)
{
if (!waitingForAck.TryRemove(id, out ignored))
{
Console.WriteLine("cannot delete {0}", id);
}
}
}
else
{
if (!waitingForAck.TryRemove(e.DeliveryTag, out ignored))
{
Console.WriteLine("cannot delete {0}", e.DeliveryTag);
}
}
}
private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
{
if (e.ReplyCode == 541)
{
var temp = waitingForAck.Values.ToList();
waitingForAck.Clear();
Console.WriteLine("Connection lost, requeue {0} messages", temp.Count);
foreach (var message in temp)
{
sendQueue.Enqueue(message);
}
}
}
谁能告诉我我做错了什么?
【问题讨论】:
-
发布的代码只是测试工具,而不是您的实际 RabbitMQ 代码。发布您的 RabbitMQ 代码(
Send(...)方法的内容)可能会有所帮助 -
只需向下滚动代码,就在其中。