【发布时间】:2023-03-14 18:19:01
【问题描述】:
我正在尝试设置 RabbitMQ,我可以在其中选择将消息作为扇出或直接发布到服务。但是,当我发布到 fanout 交换时,我看到消息已传递到所有服务,但也以循环方式传递。因此,其中一项服务总是两次看到相同的消息。
这是一个完整的复制:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace rabbitmq_exchanges_repro
{
class Program
{
static void Main(string[] args)
{
var hostName = "localhost";
var factory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = hostName,
};
var connection = factory.CreateConnection();
var model = connection.CreateModel();
var serviceName = "service1";
// This queue is for round-robin messages distributed to instances of the service with the specified service name.
var directExchangeName = $"{serviceName}-direct";
model.QueueDeclare(
serviceName,
durable: true,
exclusive: false,
autoDelete: false);
model.ExchangeDeclare(
exchange: directExchangeName,
type: "direct",
durable: true,
autoDelete: false);
model.QueueBind(
queue: serviceName,
exchange: directExchangeName,
routingKey: string.Empty);
// This is for fanout messages distributed to all services with the specified service name.
var fanoutExchangeName = $"{serviceName}-fanout";
model.ExchangeDeclare(
exchange: fanoutExchangeName,
type: "fanout",
durable: true,
autoDelete: false);
var fanoutQueueName = model
.QueueDeclare()
.QueueName;
model.QueueBind(
queue: fanoutQueueName,
exchange: fanoutExchangeName,
routingKey: string.Empty);
var directConsumer = new EventingBasicConsumer(model);
var fanoutConsumer = new EventingBasicConsumer(model);
var workItemConsumerTag = model.BasicConsume(
queue: serviceName,
autoAck: true,
consumer: directConsumer);
var fanoutConsumerTag = model.BasicConsume(
queue: fanoutQueueName,
autoAck: true,
consumer: fanoutConsumer);
directConsumer.Received += (o, e) =>
{
Console.WriteLine("Received message (direct)");
};
fanoutConsumer.Received += (o, e) =>
{
Console.WriteLine("Received message (fanout)");
};
Console.WriteLine("[P]ublish");
Console.WriteLine("E[x]it");
var exit = false;
while (!exit)
{
var key = Console.ReadKey();
switch (key.Key)
{
case ConsoleKey.P:
model
.BasicPublish(
exchange: fanoutExchangeName,
routingKey: string.Empty,
body: new byte[] { 1, 2, 3 });
break;
case ConsoleKey.X:
exit = true;
break;
}
}
model.BasicCancel(workItemConsumerTag);
model.BasicCancel(fanoutConsumerTag);
model.Close();
model.Dispose();
connection.Close();
connection.Dispose();
}
}
}
在两个单独的控制台窗口中运行上述代码。如果您在一个窗口中按P,您会看到一个实例输出了我所期望的结果:
Received message (fanout)
但是另一个窗口输出这个:
Received message (fanout)
Received message (direct)
尽管PublishBasic 调用指定了扇出交换名称,但还是会这样做。这里发生了什么?我如何确保直接交换不涉及这种情况?
【问题讨论】:
-
我无法使用 RabbitMQ 3.7.14 和您的代码进行复制。我只在每个终端窗口中收到“收到的消息(扇出)”消息。也许 RabbitMQ 中有旧的绑定?您可以重置您的实例并重试吗?
-
@LukeBakken 实际上,我删除了我的 docker 容器并重新创建,现在它可以工作了。如果您将其添加为答案,我将很乐意接受。谢谢!
标签: c# .net .net-core rabbitmq