【问题标题】:How do I configure scoped filters to work the same for consumers as for activities如何配置范围过滤器以对消费者和活动一样工作
【发布时间】:2020-10-21 08:29:46
【问题描述】:

我已经设法为消费者配置了范围服务和范围过滤器,这意味着我可以在实现IFilter<ConsumeContext<T>> 的过滤器中为范围服务设置一个值,并使用UseConsumeFilter 注册过滤器。过滤器在我的作用域服务中设置一个值,然后可以将作用域服务注入到我的使用者中,并且仍然具有设置的值。

我尝试使用IFilter<ExecuteContext<TArguments>> 对活动执行相同的操作,并使用UseExecuteActivityFilter 注册我的过滤器。

在 ExecuteActivityContext 中设置的值在 Activity 中不可访问。我认为它们成为两个不同的 DI 示波器。我将分享我的活动和消费者实现中的代码,也许活动一中缺少一些东西。我试图只保留重要的部分,所以如果某处存在非法语法,那是我试图清理 SO 的代码。

这是我以错误的方式使用 DI 还是使用 DI 进行活动的问题?我尝试遵循 masstransits 网站上的“Scoped Filters”文档。我在 .net core 3.1 和 masstransit 7.0.4 上。

用于测试的范围服务

//Interface
public interface IContextService
{
    string TenantId { get; set; }       
}
//DI registration
services.AddScoped<IContextService, ContextService>();

活动配置,这不起作用

//Filter
public class RetreiveContextExecuteFilter<TArguments> : IFilter<ExecuteContext<TArguments>>
       where TArguments : class
    {
        public IContextService _contextService { get; }

        public RetreiveContextExecuteFilter(IContextService contextService)
        {
            _contextService = contextService;
        }

        public async Task Send(ExecuteContext<TArguments> context, IPipe<ExecuteContext<TArguments>> next)
        {
            _contextService.tenantId = "test-tenant";

            await next.Send(context);
        }

        public void Probe(ProbeContext context)
        {
            var scope = context.CreateFilterScope("testcontextinformation");
        }
    }

//Activity
public class ExampleActivity
    : IExecuteActivity<ExampleActivityArguments>
    {
        private readonly IContextService _contextService;

        public ExampleActivity(IContextService contextService)
        {
            _contextService = contextService;
        }

        public async Task<ExecutionResult> Execute(ExecuteContext<ExampleActivityArguments> context)
        {
            var tenant = _contextService.tenantId; //Empty
        }
    }

//DI
services.AddMassTransit(cfg =>
    {
    cfg.AddActivitiesFromNamespaceContaining<ExampleActivity>();
    services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);                       
    cfg.UsingRabbitMq(ConfigureBus);
});

private static void ConfigureBus(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator configurator)
{
    configurator.ConfigureEndpoints(context);
    configurator.UseExecuteActivityFilter(typeof(RetreiveContextExecuteFilter<>), context);
}

消费者配置,这正在工作

//Filter definition
public class RetreiveContextConsumeFilter<T> : IFilter<ConsumeContext<T>>
    where T : class
{
    public IContextService _contextService { get; }

    public RetreiveContextConsumeFilter(IContextService contextService)
    {
        _contextService = contextService;
    }

    public Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        _contextService.TenantId = "test tenant";
        return next.Send(context);
    }

    public void Probe(ProbeContext context)
    {
        context.CreateFilterScope("contextinformation");
    }
}

//Consumer
public class ExampleConsumer
: IConsumer<ExampleEvent>
{
    private readonly IContextService _contextService;

    public ExampleConsumer(IContextService contextService)
    {
        _contextService = contextService;
    }

    public async Task Consume(ConsumeContext<ExampleEvent> context)
    {
         var id = _contextService.TenantId(); //Correct value
    }
}
//DI
services.AddMassTransit(cfg =>
{              
    cfg.AddConsumersFromNamespaceContaining<ExampleConsumer>();                 
    services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);                                    
    cfg.UsingRabbitMq(ConfigureBus);
});

private static void ConfigureBus(IBusRegistrationContext context, IRabbitMqBusFactoryConfigurator configurator)
{
    configurator.ConfigureEndpoints(context);
    configurator.UseConsumeFilter(typeof(RetreiveContextConsumeFilter<>), context);
}

【问题讨论】:

    标签: c# asp.net-core dependency-injection masstransit


    【解决方案1】:

    首先猜测,是你的配置顺序不正确。 MassTransit 构建管道,并且您在过滤器之前配置端点,这将使过滤器在端点之后运行。这是我的猜测。

    将消费者更改为:

    configurator.UseConsumeFilter(typeof(RetreiveContextConsumeFilter<>), context);
    configurator.ConfigureEndpoints(context);
    

    将活动更改为:

    configurator.UseExecuteActivityFilter(typeof(RetreiveContextExecuteFilter<>), context);
    configurator.ConfigureEndpoints(context);
    

    【讨论】:

    • 我已经尝试了这两个版本并且反转它没有帮助,它可能会修复其他场景但不是在这里。为了让你更容易看到,我做了一个最小的 repo 来显示问题。当我把所有东西都隔离开时,可能更容易看到我错过了什么。 GithubExample 。在示例中,我在端点之前配置了过滤器。如示例所示,Consumer 可以获取过滤器中设置的租户 ID,但 Activity 不能。
    • 如果我将configurator.UseServiceScope 添加到我的 github 示例中,则在其他过滤器之前,范围似乎可以正常工作。这是一些糟糕的黑客行为还是可能需要执行活动过滤器?如果我这样做,这是配置的最终结果,代码很长,所以我在测试仓库中创建了一个分支来显示它UseServiceScope-branch
    • UseServiceScope 是一个很好的解决方案,如果它可以解决您的问题。我不确定为什么活动会有所不同,但会看看你的例子。
    猜你喜欢
    • 1970-01-01
    • 2014-03-04
    • 2014-10-11
    • 2019-03-26
    • 1970-01-01
    • 2019-10-17
    • 1970-01-01
    • 1970-01-01
    • 2023-04-08
    相关资源
    最近更新 更多