【问题标题】:Publish generic (interface based) message and Consume concrete (concrete class) message in MassTransit在 MassTransit 中发布通用(基于接口)消息和使用具体(具体类)消息
【发布时间】:2020-06-21 21:31:42
【问题描述】:

我有这个设计选择问题,不知何故苦苦挣扎,但徒劳无功。它仅适用于特定场景。

我正在尝试在 MassTransit 中发布和使用消息。 例如:(Publisher - 一个简单的控制台应用)

IShape message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
bus.Publish(message);

(消费者 - CircleConsumer 和 SquareConsumer)

 class CircleConsumer : IConsumer<IShape>
    {
        public Task Consume(ConsumeContext<IShape> context)
        {
            var circle = context.Message as Circle;
            return Task.CompletedTask;
        }
    }

    class SquareConsumer : IConsumer<IShape>
    {
        public Task Consume(ConsumeContext<IShape> context)
        {
            var square = context.Message as Square;
            return Task.CompletedTask;
        }
    }

(.Net Core 托管服务项目中的消费者配置)

 public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>()
                   .AddScoped<SquareConsumer>()
                    .AddScoped<CircleConsumer>()
                    .AddMassTransit(cfg =>
                     {
                         cfg.AddBus(ConfigureBus);
                         cfg.AddConsumer<SquareConsumer>();
                         cfg.AddConsumer<CircleConsumer>();
                     })
                    .AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
                    .AddSingleton<IHostedService, TestMTConsumerHostedService>();

                    IBusControl ConfigureBus(IServiceProvider provider)
                    {
                        return Bus.Factory.CreateUsingRabbitMq(cfg =>
                        {
                            var host = cfg.Host(hostContext.Configuration["RabbmitMQ:Server:Host"], hostContext.Configuration["RabbmitMQ:Server:VirtualHost"], h =>
                            {
                                h.Username(hostContext.Configuration["RabbmitMQ:Auth:Username"]);
                                h.Password(hostContext.Configuration["RabbmitMQ:Auth:Password"]);
                            });

                            cfg.ReceiveEndpoint("CircleQueue", ep =>
                            {
                                ep.PrefetchCount = 16;
                                ep.UseMessageRetry(r => r.Interval(2, 100));
                                ep.Consumer<CircleConsumer>(provider);
                            });

                            cfg.ReceiveEndpoint("SquareQueue", ep =>
                            {
                                ep.PrefetchCount = 16;
                                ep.UseMessageRetry(r => r.Interval(2, 100));
                                ep.Consumer<SquareConsumer>(provider);
                            });
                        });
                    }

                });

我的要求是让发布者在不了解具体类的情况下发布消息。并且只有一个消费者根据消息类型接收消息。

但看起来消费者都在接收消息并且投射也不起作用。 Desired:假设当发布者发送 Square 对象时,只有 Square 消费者应该收到调用。但是,就我而言,SquareConsumer 和 CircleConsumer 都收到了消息。

作为一种解决方法,这是可行的:

  1. 始终发布具体对象。

            bus.Publish(new Square());
    
  2. 用具体类型声明消费者。

           class CircleConsumer : IConsumer<Circle>
            {
                public Task Consume(ConsumeContext<Circle> context)
                {
                    var circle = context.Message;
                    return Task.CompletedTask;
                }
            }
    
            class SquareConsumer : IConsumer<Square>
            {
                public Task Consume(ConsumeContext<Square> context)
                {
                    var square = context.Message;
                    return Task.CompletedTask;
                }
            }
    

但是,如果我能通用地做到这一点,那就太好了。

有什么建议吗?

【问题讨论】:

    标签: masstransit message-type


    【解决方案1】:

    如果您像这样更改代码:

    object message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
    bus.Publish(message);
    

    和消费者

    class CircleConsumer : IConsumer<Circle>
    {
        public Task Consume(ConsumeContext<Circle> context)
        {
            // do circle stuff
        }
    }
    
    class SquareConsumer : IConsumer<Square>
    {
        public Task Consume(ConsumeContext<Square> context)
        {
            // do square stuff
        }
    }
    

    它将按预期工作。

    这里我详细说明一下变化:

    1. Publish 与特定类型的实例一起使用意味着使用Publish&lt;T&gt;(T message) 重载,它使用T 作为消息类型。当显式将消息类型设置为object 时,我们调用Publish(object message) 重载。在这种情况下,MassTransit 将查找消息实现的所有类型。
    2. 如果您的目标是使用具体类型的消息,则不需要使用共享接口类型的消息。您只需要为这些特定类型创建消费者。只要您按照我在上一点中描述的方式使用发布,消息就会同时发送到IShapeCircle 交换(例如)。

    【讨论】:

    • 感谢您的解决方案。我昨天已经接受并采取了类似的方法。但是,我在考虑不必更改与业务相关的方法的方法签名,并使它们更清洁和有条理。但是,嘿,我们已经有了一个可行的解决方案。我也在下面添加了我的方法作为答案。
    • 我肯定知道,当你使用问题中的代码时,MT 会使用Publish&lt;T&gt;(T message) 重载,它使用T 作为消息类型。这里的技巧是使用 Publish(object messge)` ,它强制 MT 找出消息类型。这就是为什么使用 object xxx= 而不是 var 或具体类型来解决发布问题的原因。如果您只注意到消费者的变化,您可能会错过那部分。
    【解决方案2】:

    更新:我最终采用了以下方法。但是,我希望 MassTransit 能够在本质上路由纯粹多态的消息。这只是一种解决方法,而不是真正的解决方案。仍然欢迎使用新方法。

    在具体类中使用反射和朋友方法的帮助,我得到了这个。

    出版商:

                IShape message = GetShape(text);
                var castedMessage = ReflectionHelper.CastToConcreteType(message);
                bus.Publish(castedMessage);
    
            public static class ReflectionHelper
            {
                public static object CastToConcreteType(object obj)
                {
                    MethodInfo castMethod = obj.GetType().GetMethod("Cast").MakeGenericMethod(obj.GetType());
                    return castMethod.Invoke(null, new object[] { obj });
                }
            }
    

    消息类型的接口和具体类:

        public interface IShape
            {
                public string Color { get;  }
                public string Name { get; }
            }
    
            public class Circle : IShape, ITypeCastable
        {
                public string Color => "Red";
                public string Name => $"{Color}Circle";
                T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
                public static T Cast<T>(object o) => (T)o;
            }
            public class Square : IShape, ITypeCastable
            {
                public string Color => "Green";
                public string Name => $"{Color}Square";
                T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
                public static T Cast<T>(object o) => (T)o;
            }
    
            public interface ITypeCastable
            {
                T Cast<T>(object obj);
            }
    

    消费者:通过用泛型类型供应中的具体类名替换接口,进行了非常小的更改。

        class CircleConsumer : IConsumer<Circle>
        {
            public Task Consume(ConsumeContext<Circle> context)
            {
                var circle = context.Message;
                return Task.CompletedTask;
            }
        }
    
        class SquareConsumer : IConsumer<Square>
        {
            public Task Consume(ConsumeContext<Square> context)
            {
                var square = context.Message;
                return Task.CompletedTask;
            }
        }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2023-03-29
      • 2017-11-18
      • 1970-01-01
      • 1970-01-01
      • 2017-06-18
      • 1970-01-01
      • 1970-01-01
      • 2010-12-02
      相关资源
      最近更新 更多