【问题标题】:MassTransit Consumer never receives messageMassTransit Consumer 从未收到消息
【发布时间】:2020-10-25 08:32:09
【问题描述】:

我正在按照文档构建一个演示应用程序,以在 ASP.NET Core 应用程序中使用 MassTransit 与 RabbitMQ 和 Autofac:

我的程序代码:

namespace MessageDemo
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var host = Host.CreateDefaultBuilder(args)
                .UseServiceProviderFactory(new AutofacServiceProviderFactory())
                .ConfigureWebHostDefaults(webHostBuilder =>
                {
                    webHostBuilder
                        .UseContentRoot(Directory.GetCurrentDirectory())
                        .UseIISIntegration()
                        .UseStartup<Startup>();
                })
                .Build();
            host.Run();
        }
    }
}

我的创业:

    public class Startup
    {
        public Startup(IWebHostEnvironment env)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(env.ContentRootPath)
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
                .AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true)
                .AddEnvironmentVariables();
            this.Configuration = builder.Build();
        }

        public IConfiguration Configuration { get; }
        public ILifetimeScope AutofacContainer { get; set; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddOptions();
            services.AddControllers();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            this.AutofacContainer = app.ApplicationServices.GetAutofacRoot();

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }

        // ConfigureContainer is where you can register things directly
        public void ConfigureContainer(ContainerBuilder builder)
        {

            builder.RegisterType<DemoContent>().As<IDemoContent>();
            builder.RegisterType<WeatherForecast>().As<IWeatherForecast>();

            builder.AddMassTransit(x =>
            {
                x.AddConsumer<DemoConsumer>();

                x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
                {

                    cfg.Host("rabbitmq://my_container_ip/", host =>
                    {
                        host.Username("devuser");
                        host.Password("devuser");
                    });


                    cfg.ReceiveEndpoint("submit-data", ec =>
                    {
                        // Configure a single consumer
                        ec.ConfigureConsumer<DemoConsumer>(context);
                    });

                }));
            });
        }
    }

我的消费者:

    public class DemoConsumer : IConsumer<IDemoContent>
    {
        public async Task Consume(ConsumeContext<IDemoContent> context)
        {
            Debug.WriteLine($"Write content: {context.Message.Data}");
            await Console.Out.WriteLineAsync($"Write content: {context.Message.Data}");
        }
    }

只是为了测试,我通过点击控制器端点之一触发发布,PublishEndpoint 由容器注入:

        // GET: api/Demo
        [HttpGet]
        public async void Get()
        {
            await _endpoint.Publish<IDemoContent>(new
            {
                Data = "Some random content"
            }, new CancellationToken());
        }

这一切似乎都在工作 - 没有错误信息 - 添加了一个使用 InMemoryTestHarness 的演示单元测试,并且正在运行 - 我的 RabbitMQ 实例在 Manager Overview 中注册发布的消息

我在 RabbitMQ 管理 UI 中获得发布者确认,但消息显示为 Unrouteable(drop)。

【问题讨论】:

  • 注意:我通过添加 services.AddMassTransitHostedService(); 进行了以下建议的修复。到我的 ConfigureServices 方法,但还需要注册它的依赖 builder.RegisterType().As().SingleInstance();

标签: asp.net-core rabbitmq masstransit consumer


【解决方案1】:

由于您尚未添加托管服务,因此总线无法启动

services.AddMassTransitHostedService();

就在代码 sn-p in the docs 中。

【讨论】:

  • 谢谢,不知道我是怎么打错电话的。还需要注册 BusRegistry builder.RegisterType().As().SingleInstance();
猜你喜欢
  • 2012-12-13
  • 2013-03-08
  • 2023-03-28
  • 1970-01-01
  • 2016-09-07
  • 1970-01-01
  • 2019-11-20
  • 2019-01-23
相关资源
最近更新 更多