【问题标题】:How to write an EasyNetQ auto subscriber dispatcher for the ASP.NET Core services provider?如何为 ASP.NET Core 服务提供者编写 EasyNetQ 自动订阅调度程序?
【发布时间】:2019-09-30 20:25:48
【问题描述】:

因此,EasyNetQ 自动订阅者有一个基本的默认调度程序,它无法使用非无参数构造函数创建消息消费者类。

要查看实际情况,请创建一个具有所需依赖项的使用者。您可以设置自己的服务,也可以使用由框架默认自动注册的ILogger<T>

ConsumeTextMessage.cs

public class ConsumeTextMessage : IConsume<TextMessage>
{
    private readonly ILogger<ConsumeTextMessage> logger;

    public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
    {
        this.logger = logger;
    }

    public void Consume(TextMessage message)
    {
        ...
    }
}

连接自动订阅者(在哪里/何时编写/运行此代码方面存在一些余地)。

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}

(其他地方,可能是startup.ConfigureBackgroundService

var subscriber = new AutoSubscriber(bus, "example");
subscriber.Subscribe(Assembly.GetExecutingAssembly());

现在,启动程序并发布一些消息,您应该会看到每条消息都在默认错误队列中结束。

System.MissingMethodException: No parameterless constructor defined for this object.
   at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
   at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
   at System.Activator.CreateInstance[T]()
   at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.DispatchAsync[TMessage,TAsyncConsumer](TMessage message)
   at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandlerInternalAsync(ConsumerExecutionContext context)

我知道我可以提供我的own dispatcher,但是我们如何与 ASP.NET Core 服务提供商一起工作;确保这适用于范围服务?

【问题讨论】:

    标签: asp.net-core easynetq


    【解决方案1】:

    所以,这就是我想出的。

    public class MessageDispatcher : IAutoSubscriberMessageDispatcher
    {
        private readonly IServiceProvider provider;
    
        public MessageDispatcher(IServiceProvider provider)
        {
            this.provider = provider;
        }
    
        public void Dispatch<TMessage, TConsumer>(TMessage message)
            where TMessage : class
            where TConsumer : class, IConsume<TMessage>
        {
            using(var scope = provider.CreateScope())
            {
                var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
                consumer.Consume(message);
            }
        }
    
        public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
            where TMessage : class
            where TConsumer : class, IConsumeAsync<TMessage>
        {
            using(var scope = provider.CreateScope())
            {
                var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
                await consumer.ConsumeAsync(message);
            }
        }
    }
    

    几个值得注意的点...

    IServiceProvider 依赖项是 ASP.NET Core DI 容器。一开始可能不清楚,因为在整个Startup.ConfigureServices() 中,您正在使用另一个接口IServiceCollection 注册类型。

    public MessageDispatcher(IServiceProvider provider)
    {
        this.provider = provider;
    }
    

    为了解析范围服务,您需要创建和管理范围的生命周期围绕创建和使用消费者。我正在使用GetRequiredService&lt;T&gt; 扩展方法,因为我真的想要一个讨厌的异常,而不是在我们注意到它之前可能会泄漏一段时间的空引用(以空引用异常的形式)。

    using(var scope = provider.CreateScope())
    {
        var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
        consumer.Consume(message);
    }
    

    如果您只直接使用provider,如provider.GetRequiredService&lt;T&gt;(),则在尝试解析作用域消费者或消费者的作用域依赖项时会看到这样的错误。

    抛出异常:Microsoft.Extensions.DependencyInjection.dll 中的“System.InvalidOperationException”:“无法从根提供商解析范围服务“Example.Messages.ConsumeTextMessage”。”

    为了解析作用域服务并为异步消费者正确维护它们的生命周期,您需要在正确的位置获取 async/await 关键字。您应该等待 ConsumeAsync 调用,它要求方法是异步的。在等待行和消费者中使用断点并逐行处理以更好地处理此问题!

    public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
        where TMessage : class
        where TConsumer : class, IConsumeAsync<TMessage>
    {
        using(var scope = provider.CreateScope())
        {
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            await consumer.ConsumeAsync(message);
        }
    }
    

    好的,现在我们有了调度程序,我们只需要在 Startup 中正确设置所有内容。我们需要 resolve 来自提供者的调度程序,以便提供者可以正确地提供自己。这只是一种方式。

    Startup.cs

    public void ConfigureServices(IServiceCollection services)
    {
        // messaging
        services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
        services.AddSingleton<MessageDispatcher>();
        services.AddSingleton<AutoSubscriber>(provider =>
        {
            var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
            {
                AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
            }
        });
    
        // message handlers
        services.AddScoped<ConsumeTextMessage>();
    }
    
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
    }
    

    【讨论】:

    • 我真的很喜欢你的回答。但是,在配置 IHostBuilder(dotnet 控制台/主机应用程序)后,我无法使其正常工作。消息调度(以及该类型的任何消费者)未被调用。 builder.Services.GetRequiredService().SubscribeAsync(assembly); 知道如何适应吗?
    • 我可以查看更多上下文吗?也许是一个要点? gist.github.com
    • 好提示! Gist github 注意我没有测试这段代码并删除了我无法显示的代码。另请注意,我尝试了 IConsumeAsync 和 IConsume (相应地调整 Program.cs)。任何提示或帮助都会很棒!
    • 啊还要注意:我确实创建并注册了 IBus(尽管我的代码要点没有显示)。错误发生在运行时和调度消息时。 (它确实触发并注意到 IConsume/Async 已注册,它只是无法构造处理程序。很高兴知道:我的服务是 BackgroundService)
    猜你喜欢
    • 2014-10-23
    • 1970-01-01
    • 2011-11-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-19
    • 2020-04-22
    • 1970-01-01
    相关资源
    最近更新 更多