【问题标题】:MassTransit Filtering messages a consumer can handleMassTransit 过滤消费者可以处理的消息
【发布时间】:2020-12-08 12:13:21
【问题描述】:

我有一个像这样的通用消息接口:

public interface IMyMessage
{
     int EventCode {get;}
}

现在我有多个消费者处理这条消息:

public class MyConsumer1: IConsumer<IMyMessage>{...}
public class MyConsumer2: IConsumer<IMyMessage>{...}

我希望 MyConsumer1 仅处理 EventCode==1 的那些消息,并让 MyConsumer2 处理 EventCode==2 的所有消息。

我知道我可以在 Consume 方法中做一个 if 语句,但想知道是否有更好的方法,比如一些路由过滤器?

我的首选方式是创建一个属性,即。 HandlesEventCodeAttribute(1) 并将其应用于消费者。

我还使用 Autofac 容器与 MassTransit 的集成。

请帮忙。

谢谢

【问题讨论】:

    标签: .net-core filter autofac masstransit


    【解决方案1】:

    在我就实际问题提供任何意见之前,我想问一下为什么您要使用具有属性的相同消息类型来确定哪些消费者实际使用了该消息。有更好(更有效)的方法可用,例如使用 RabbitMQ 的 DIRECT 交换。

    您可以创建自己的属性,并创建一个middleware filter 来查看消费者,查看它是否具有自定义属性,然后使用该属性中的值检查消息并在消费者是对此不感兴趣

    一个完整的工作示例如下所示:

    首先,创建属性。

    class EventCodeAttribute :
        Attribute
    {
        public int EventCode { get; }
    
        public EventCodeAttribute(int eventCode)
        {
            EventCode = eventCode;
        }
    }
    

    以及消息类型:

    interface IEventMessage
    {
        int EventCode { get; }
    }
    

    中间件过滤器:

    class EventCodeFilter<TConsumer> :
        IFilter<ConsumerConsumeContext<TConsumer, IEventMessage>>
        where TConsumer : class
    {
        readonly int _eventCode;
    
        public EventCodeFilter()
        {
            var attribute = typeof(TConsumer).GetCustomAttribute<EventCodeAttribute>();
            if (attribute == null)
                throw new ArgumentException("Message does not have the attribute required");
    
            _eventCode = attribute.EventCode;
        }
    
        public async Task Send(ConsumerConsumeContext<TConsumer, IEventMessage> context, IPipe<ConsumerConsumeContext<TConsumer, IEventMessage>> next)
        {
            if (context.Message.EventCode.Equals(_eventCode))
            {
                await next.Send(context);
            }
        }
    
        public void Probe(ProbeContext context)
        {
            var scope = context.CreateFilterScope("eventCode");
            scope.Add("code", _eventCode);
        }
    }
    

    样本消费者:

    [EventCode(27)]
    class EventCodeConsumer :
        IConsumer<IEventMessage>
    {
        public async Task Consume(ConsumeContext<IEventMessage> context)
        {
        }
    }
    

    最后,配置消费者使用过滤器:

    builder.AddMassTransit(cfg =>
    {
        cfg.AddConsumer<EventCodeConsumer>(x =>
            x.ConsumerMessage<IEventMessage>(p => p.UseFilter(new EventCodeFilter<EventCodeConsumer>())));
    });
    

    【讨论】:

    • 感谢您的回答。这正是我所需要的。现在,关于您关于单个消息的问题......我无法控制发布者,发布者使用池从 PLC 控制器读取,然后对于每个读取的数据,它使用他从 PLC 读取的事件代码发布消息.
    • 啊,是的,完全明白了。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-07-11
    • 1970-01-01
    • 2016-05-04
    • 2016-06-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多