【问题标题】:Why is publishing to fanout exchange also publishing to direct exchange?为什么发布到扇出交换也发布到直接交换?
【发布时间】:2023-03-14 18:19:01
【问题描述】:

我正在尝试设置 RabbitMQ,我可以在其中选择将消息作为扇出或直接发布到服务。但是,当我发布到 fanout 交换时,我看到消息已传递到所有服务,但也以循环方式传递。因此,其中一项服务总是两次看到相同的消息。

这是一个完整的复制:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace rabbitmq_exchanges_repro
{
    class Program
    {
        static void Main(string[] args)
        {
            var hostName = "localhost";
            var factory = new ConnectionFactory
            {
                AutomaticRecoveryEnabled = true,
                HostName = hostName,
            };

            var connection = factory.CreateConnection();
            var model = connection.CreateModel();

            var serviceName = "service1";

            // This queue is for round-robin messages distributed to instances of the service with the specified service name.
            var directExchangeName = $"{serviceName}-direct";
            model.QueueDeclare(
                serviceName,
                durable: true,
                exclusive: false,
                autoDelete: false);
            model.ExchangeDeclare(
                exchange: directExchangeName,
                type: "direct",
                durable: true,
                autoDelete: false);
            model.QueueBind(
                queue: serviceName,
                exchange: directExchangeName,
                routingKey: string.Empty);

            // This is for fanout messages distributed to all services with the specified service name.
            var fanoutExchangeName = $"{serviceName}-fanout";
            model.ExchangeDeclare(
                exchange: fanoutExchangeName,
                type: "fanout",
                durable: true,
                autoDelete: false);
            var fanoutQueueName = model
                .QueueDeclare()
                .QueueName;
            model.QueueBind(
                queue: fanoutQueueName,
                exchange: fanoutExchangeName,
                routingKey: string.Empty);

            var directConsumer = new EventingBasicConsumer(model);
            var fanoutConsumer = new EventingBasicConsumer(model);
            var workItemConsumerTag = model.BasicConsume(
                queue: serviceName,
                autoAck: true,
                consumer: directConsumer);
            var fanoutConsumerTag = model.BasicConsume(
                queue: fanoutQueueName,
                autoAck: true,
                consumer: fanoutConsumer);

            directConsumer.Received += (o, e) =>
            {
                Console.WriteLine("Received message (direct)");
            };
            fanoutConsumer.Received += (o, e) =>
            {
                Console.WriteLine("Received message (fanout)");
            };

            Console.WriteLine("[P]ublish");
            Console.WriteLine("E[x]it");
            var exit = false;

            while (!exit)
            {
                var key = Console.ReadKey();

                switch (key.Key)
                {
                    case ConsoleKey.P:
                        model
                            .BasicPublish(
                                exchange: fanoutExchangeName,
                                routingKey: string.Empty,
                                body: new byte[] { 1, 2, 3 });
                        break;
                    case ConsoleKey.X:
                        exit = true;
                        break;
                }
            }

            model.BasicCancel(workItemConsumerTag);
            model.BasicCancel(fanoutConsumerTag);

            model.Close();
            model.Dispose();

            connection.Close();
            connection.Dispose();
        }
    }
}

在两个单独的控制台窗口中运行上述代码。如果您在一个窗口中按P,您会看到一个实例输出了我所期望的结果:

Received message (fanout)

但是另一个窗口输出这个:

Received message (fanout)
Received message (direct)

尽管PublishBasic 调用指定了扇出交换名称,但还是会这样做。这里发生了什么?我如何确保直接交换不涉及这种情况?

【问题讨论】:

  • 我无法使用 RabbitMQ 3.7.14 和您的代码进行复制。我只在每个终端窗口中收到“收到的消息(扇出)”消息。也许 RabbitMQ 中有旧的绑定?您可以重置您的实例并重试吗?
  • @LukeBakken 实际上,我删除了我的 docker 容器并重新创建,现在它可以工作了。如果您将其添加为答案,我将很乐意接受。谢谢!

标签: c# .net .net-core rabbitmq


【解决方案1】:

我无法使用 RabbitMQ 3.7.14 和您的代码进行重现。我只在每个终端窗口中收到“收到的消息(扇出)”消息。也许 RabbitMQ 中有旧的绑定?您应该重置您的实例并重试。


注意:RabbitMQ 团队会监控 rabbitmq-users mailing list,并且仅有时会回答 * 上的问题。

【讨论】: