一. 生产者-确认机制
1. Confirm模式
(1). 含义:就是应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示收到了。
(2). 特点:异步模式,在应之前,可以继续发送消息,单条消息、批量消息均可继续发送。
(3). 核心代码:单条消息确认: channel.waitForConfirms()
批量消息确认: channel.waitForConfirmsOrDie()
异步监听消息确认:channel.addConfirmListener()
PS: 大致流程:channel.ConfirmSelect();开启确认模式→发送消息→提供一个回执方法WaitForConfirms(); 返回一个bool 值
代码分享:
/// <summary> /// 生产者-Confirm模式 /// </summary> public class ProductionConfirm { public static void Show() { Console.ForegroundColor = ConsoleColor.Red; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("-------------------------生产者准备就绪-----------------------------"); channel.QueueDeclare(queue: "ConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.ExchangeDeclare(exchange: "ConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueBind(queue: "ConfirmQueue", exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey"); string message = ""; //在控制台输入消息,按enter键发送消息 while (!message.Equals("stop", StringComparison.CurrentCultureIgnoreCase)) { Console.WriteLine("请输入要发送的消息:"); message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); try { //开启消息确认模式 channel.ConfirmSelect(); //发送消息 channel.BasicPublish(exchange: "ConfirmExchange", routingKey: "ConfirmSelectKey", basicProperties: null, body: body); if (channel.WaitForConfirms()) //单条消息确认 { //表示消息发送成功(已经存入队列) Console.WriteLine($"【{message}】发送到Broke成功!"); } else { //表示消息发送失败 } //channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行, 如果有消息发送失败;就抛出异常; } catch (Exception) { //表示消息发送失败 Console.WriteLine($"【{message}】发送到Broker失败!"); } } } } Console.ReadKey(); } }