【问题标题】:MassTransit: Queue is not created for topic exchangeMassTransit:没有为主题交换创建队列
【发布时间】:2019-05-17 09:23:24
【问题描述】:

我想使用topic exchange 将股票价格数据发送到rabbitmq。这个想法是我使用以下路由键进行主题交换:

<message-type>.<ticker>

我能够使用原生 RabbitMQ cient 完成此操作,但我无法弄清楚如何在 MassTransit 的帮助下完成此操作。

// setup topologies
rabbitCfg.Send<ComMessage>(x =>
{
   x.UseRoutingKeyFormatter(context => 
        $"quote.{context.Message.Ticker}");
});

rabbitCfg.Message<ComMessage>(x => x.SetEntityName("Quotes"));
rabbitCfg.Publish<ComMessage>(x =>
{
   x.ExchangeType = ExchangeType.Topic;
});


// setup reciever
rabbitCfg.ReceiveEndpoint(host, "MSFT", e =>
{
   e.Bind("Quotes", c =>
      {
         c.RoutingKey = "quote.MSFT";
         c.ExchangeType = ExchangeType.Topic;
      });

   e.Consumer<PriceConsumer>();
});

发送消息:

await _bus.Publish(new ComMessage
{
   Ticker = "MSFT",
   Price = "10"
});

但是,它不起作用。队列未创建,但交换接收消息:

哪里出了问题?

【问题讨论】:

    标签: c# rabbitmq masstransit


    【解决方案1】:

    我想你忘记了一个重要的台词。作为参考,我已经包含了使用主题交换的工作单元测试的源代码。

    在您的接收端点中,您需要禁用自动交换绑定。

    cfg.ReceiveEndpoint(host, "MSFT", x =>
    {
        x.ConfigureConsumeTopology = false;
        ...
    }
    

    一个工作示例如下所示:

    using System;
    using System.Threading.Tasks;
    using GreenPipes.Util;
    using NUnit.Framework;
    using RabbitMQ.Client;
    using RoutingKeyTopic;
    
    
    namespace RoutingKeyTopic
    {
        public class Message
        {
            public Message(decimal price, string symbol)
            {
                Price = price;
                Symbol = symbol;
            }
    
            public string Symbol { get; set; }
    
            public decimal Price { get; set; }
        }
    }
    
    
    [TestFixture]
    public class Using_a_routing_key_and_topic_exchange :
        RabbitMqTestFixture
    {
        [Test]
        public async Task Should_support_routing_by_key_and_exchange_name()
        {
            var fooHandle = await Subscribe("MSFT");
            try
            {
                var barHandle = await Subscribe("UBER");
                try
                {
                    await Bus.Publish(new Message(100.0m, "MSFT"));
                    await Bus.Publish(new Message(3.50m, "UBER"));
    
                    await Consumer.Microsoft;
                    await Consumer.Uber;
                }
                finally
                {
                    await barHandle.StopAsync(TestCancellationToken);
                }
            }
            finally
            {
                await fooHandle.StopAsync(TestCancellationToken);
            }
        }
    
        async Task<HostReceiveEndpointHandle> Subscribe(string key)
        {
            var queueName = $"Stock-{key}";
            var handle = Host.ConnectReceiveEndpoint(queueName, x =>
            {
                x.ConfigureConsumeTopology = false;
                x.Consumer<Consumer>();
    
                x.Bind<Message>(e =>
                {
                    e.RoutingKey = GetRoutingKey(key);
                    e.ExchangeType = ExchangeType.Topic;
                });
            });
    
            await handle.Ready;
    
            return handle;
        }
    
        protected override void ConfigureRabbitMqBusHost(IRabbitMqBusFactoryConfigurator configurator, IRabbitMqHost host)
        {
            base.ConfigureRabbitMqBusHost(configurator, host);
    
            configurator.Message<Message>(x => x.SetEntityName(ExchangeName));
            configurator.Publish<Message>(x => x.ExchangeType = ExchangeType.Topic);
    
            configurator.Send<Message>(x => x.UseRoutingKeyFormatter(context => GetRoutingKey(context.Message.Symbol)));
        }
    
        string ExchangeName { get; } = "Quotes";
    
        string GetRoutingKey(string routingKey)
        {
            return $"quote.{routingKey}";
        }
    
    
        class Consumer :
            IConsumer<Message>
        {
            static readonly TaskCompletionSource<Message> _microsoft = new TaskCompletionSource<Message>();
            static readonly TaskCompletionSource<Message> _uber = new TaskCompletionSource<Message>();
            public static Task<Message> Microsoft => _microsoft.Task;
            public static Task<Message> Uber => _uber.Task;
    
            public Task Consume(ConsumeContext<Message> context)
            {
                Console.WriteLine($"Received {context.Message.Symbol} for {context.RoutingKey()}");
    
                if (context.Message.Symbol == "MSFT")
                    _microsoft.TrySetResult(context.Message);
    
                if (context.Message.Symbol == "UBER")
                    _uber.TrySetResult(context.Message);
    
                return TaskUtil.Completed;
            }
        }
    }
    

    【讨论】:

    • 感谢您的回答。实际上,我尝试使用 BindMessageExchanges 并没有帮助。当我靠近我的笔记本电脑时,我会用我的代码仔细检查你的示例
    猜你喜欢
    • 2017-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-29
    相关资源
    最近更新 更多