【问题标题】:ServiceStack Message FilteringServiceStack 消息过滤
【发布时间】:2016-11-10 21:46:25
【问题描述】:

我一直在使用 ServiceStack MQ 服务器/客户端在我的平台中启用基于消息的架构,并且它一直在完美地工作。我现在正在尝试做一些我认为 SS 消息生产者/消费者不支持的事情。

基本上,我在一个集中式数据中心发出消息(事件),并且我在美国各地通过一个不可靠的网络拥有约 2000 个分散节点,这些节点需要潜在地了解此事件,但该事件需要针对只有〜2000个节点之一。我需要具有 Pub/Sub 的任意命名通道的灵活性,但需要 MQ 的持久性。我从 Pub/Sub 开始,但网络太不可靠,所以我已将解决方案转移到使用 RedisMQServer。我让它工作,但想确保我没有在界面中丢失任何东西。我很好奇 SS 的创建者是否已经考虑过这个用例,如果是这样,那么讨论的结果是什么?这确实与使用 POCO 来驱动消息消费的结果/动作的概念相矛盾。也许是这个原因?

这是我的制作人

    public ExpressLightServiceResponse Get(ExpressLightServiceRequest query)
    {
        var result = new ExpressLightServiceResponse();

        var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
        var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
        var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public);

        typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);

        var newType = typeBuilder.CreateType();

        using (var messageProducer = _messageService.CreateMessageProducer())
        {
            var message = MessageFactory.Create(newType.CreateInstance());
            messageProducer.Publish(message);
        }

        return result;
    }

这是我的消费者

public class ServerAppHost : AppHostHttpListenerBase
{
    private readonly string _store;

    public string StoreQueue => $"EventA{_store}";

    public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly)
    {
        _store = store;
    }

    public override void Configure(Container container)
    {
        container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString));

        var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
        var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
        var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public);

        typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);

        var newType = typeBuilder.CreateType();

        var mi = typeof(Temp).GetMethod("Foo");
        var fooRef = mi.MakeGenericMethod(newType);
        fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null);
    }
}

public class Temp
{
    private readonly IRedisClientsManager _redisClientsManager;

    public Temp(IRedisClientsManager redisClientsManager)
    {
        _redisClientsManager = redisClientsManager;
    }

    public void Foo<T>()
    {
        var mqService = new RedisMqServer(_redisClientsManager);
        mqService.RegisterHandler<T>(DoWork);
        mqService.Start();
    }

    private object DoWork<T>(IMessage<T> arg)
    {
        //Do work
        return null;
    }
}

这给了我 Pub/Sub 的灵活性和队列的持久性。有没有人看到/知道实现这一目标的更“原生”方式?

【问题讨论】:

    标签: servicestack


    【解决方案1】:

    您的 AppHost 中应该只注册了 1 个 MQ 主机,所以我首先将它从您的包装类中删除并让它只注册处理程序,例如:

    public override void Configure(Container container)
    {
        //...
    
        container.Register<IMessageService>(
            c => new RedisMqServer(c.Resolve<IRedisClientsManager>());
        var mqServer = container.Resolve<IMessageService>();
    
        fooRef.Invoke(new Temp(mqServer), null);
    
        mqServer.Start();
    }
    
    public class Temp
    {
        private readonly IMessageService mqServer;
        public Temp(IMessageService mqServer)
        {
            this.mqServer = mqServer;
        }
    
        public void Foo<T>() => mqService.RegisterHandler<T>(DoWork);
    }
    

    但是这种方法不适合 ServiceStack,它鼓励使用代码优先消息,它定义了客户端/服务器用来处理发送和接收消息的服务契约。因此,如果您想使用 ServiceStack 发送自定义消息,我建议您为每条消息设置一个单独的类,否则使用像 SendEvent 这样的通用类型,其中消息或事件类型是该类的属性。

    否则,如果您想继续使用自定义消息,请不要使用RedisMqServer,您可以只使用dedicated MQ like Rabbit MQ,或者如果您更喜欢使用Redis List directly - 这是所有 Redis MQ 在下面使用的数据结构。

    【讨论】:

      猜你喜欢
      • 2013-12-31
      • 2020-03-31
      • 2014-07-22
      • 2017-08-21
      • 2022-06-17
      • 1970-01-01
      • 2013-09-16
      • 1970-01-01
      • 2017-06-08
      相关资源
      最近更新 更多