【问题标题】:putting Filter and Consumer in the same scope (MassTransit)将过滤器和消费者放在同一范围内(MassTransit)
【发布时间】:2020-07-01 13:32:09
【问题描述】:

启动类

services.AddScoped(typeof(CustomTransactionFilter<>));
services.AddMassTransitHostedService();
services.AddMassTransit(x =>
{
    x.AddConsumers(typeof(OrderStateOrchestrator).Assembly);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(massTransitOption.HostName,
                 massTransitOption.VirtualHost,
                 hst =>
                 {
                     hst.Username(massTransitOption.UserName);
                     hst.Password(massTransitOption.Password);
                 });
        cfg.UseConsumeFilter(typeof(CustomTransactionFilter<>), context);
        cfg.ReceiveEndpoint($"{Program.STARTUP_PROJECT_NAME}.{nameof(OrderStateOrchestrator)}",
                            endpointConfigurator => { endpointConfigurator.ConfigureConsumer<OrderStateOrchestrator>(context); });
    });
});

过滤器类

public class CustomTransactionFilter<T> :
        IFilter<ConsumeContext<T>> where T : class
{
    private readonly IIntegrationMessagePublisher _integrationMessagePublisher;
    
    public CustomTransactionFilter(IIntegrationMessagePublisher integrationMessagePublisher)
    {
        _integrationMessagePublisher = integrationMessagePublisher;
    }

    public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
    {
        await next.Send(context);
        Console.WriteLine($"TransactionFilter - {typeof(T).Namespace} : {_integrationMessagePublisher.IntegrationMessages.Count}");
        await _integrationMessagePublisher.Publish();
    }

    public void Probe(ProbeContext context)
    {
    }
}

消费类

public async Task Consume(ConsumeContext<PaymentCompletedEvent> context)
{
    PaymentCompletedEvent paymentCompletedEvent = context.Message;
    long orderId = long.Parse(paymentCompletedEvent.CorrelationId)

    await _distributedLockManager.LockAsync(OrderOperationKey(orderId),
                                            async () =>
                                            {
                                                IOrderStateMachine orderStateMachine = await _orderStateMachineFactory.BuildOrderStateMachineAsync(orderId);
                                                orderStateMachine.ChangePaymentStatus(PaymentStatuses.Completed);
                                                Console.WriteLine($"Consumer - {nameof(PaymentCompletedEvent)} : {_integrationMessagePublisher.IntegrationMessages.Count}");
                                            });
}

信息: 我尝试使用消息。当我消费一条消息时,会调用不同的程序。所有程序都会创建一些消息并将它们添加到消息持有者(IIntegrationMessagePublisher)中。消费操作完成后,我想发布所有消息。

我有一个名为 IIntegrationMessagePublisher 的类,它在内存中保存消息对象列表;因此,IIntegrationMessagePublisher 必须是一个作用域对象。当我在消费者中检查消息计数时,我看到计数为 1。但是当我在过滤器类中检查时,消息计数为 0。在这种情况下,我认为我不应该将过滤器和消费者放在同一范围内。

有人可以告诉我在公共交通配置期间我缺少什么吗?提前致谢。

【问题讨论】:

  • 你为什么要延迟消息?添加 UseInMemoryOutbox 将为您完成此操作,并且仅在消费者未引发异常时才发布消息。然后,您可以在依赖项中使用 IPublishEndpoint。
  • 感谢您的建议,但这只是一个用例。我尝试学习如何在同一范围内使用过滤器和消费者。
  • 问题是,如果范围在正确的级别,它们应该是相同的范围。我之前对您的其他问题的评论中的视频显示了可以配置所有中间件的位置,以及它如何影响范围。

标签: filter dependency-injection masstransit


【解决方案1】:

启动

services.AddMassTransitHostedService();
services.AddMassTransit(x =>
{
    x.AddConsumer<OrderStateOrchestrator>(
                                          configurator => configurator
                                             .UseFilter(new CustomTransactionFilter<OrderStateOrchestrator>())
                                          )
     .Endpoint(configurator => { configurator.Name = $"{Program.STARTUP_PROJECT_NAME}.{nameof(OrderStateOrchestrator)}"; });


    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(massTransitOption.HostName,
                 massTransitOption.VirtualHost,
                 hst =>
                 {
                     hst.Username(massTransitOption.UserName);
                     hst.Password(massTransitOption.Password);
                 });
        cfg.ConfigureEndpoints(context);
    });
});

过滤器

 public class CustomTransactionFilter<TConsumer> :
     IFilter<ConsumerConsumeContext<TConsumer>> where TConsumer : class
 {
     public async Task Send(ConsumerConsumeContext<TConsumer> context, IPipe<ConsumerConsumeContext<TConsumer>> next)
     {
         var serviceProvider = context.GetPayload<IServiceProvider>();
         var integrationMessagePublisher = serviceProvider.GetRequiredService<IIntegrationMessagePublisher>();

         await next.Send(context);
         await integrationMessagePublisher.Publish();
     }

     public void Probe(ProbeContext context)
     {
         context.CreateScope("TransactionFilter");
     }
 }

我看了video 几次,我终于明白了。可以在 GitHub 上分享代码吗?这将使检查代码更容易。

感谢您的帮助@ChrisPatterson

【讨论】:

    猜你喜欢
    • 2021-07-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多