【发布时间】:2016-11-10 21:46:25
【问题描述】:
我一直在使用 ServiceStack MQ 服务器/客户端在我的平台中启用基于消息的架构,并且它一直在完美地工作。我现在正在尝试做一些我认为 SS 消息生产者/消费者不支持的事情。
基本上,我在一个集中式数据中心发出消息(事件),并且我在美国各地通过一个不可靠的网络拥有约 2000 个分散节点,这些节点需要潜在地了解此事件,但该事件需要针对只有〜2000个节点之一。我需要具有 Pub/Sub 的任意命名通道的灵活性,但需要 MQ 的持久性。我从 Pub/Sub 开始,但网络太不可靠,所以我已将解决方案转移到使用 RedisMQServer。我让它工作,但想确保我没有在界面中丢失任何东西。我很好奇 SS 的创建者是否已经考虑过这个用例,如果是这样,那么讨论的结果是什么?这确实与使用 POCO 来驱动消息消费的结果/动作的概念相矛盾。也许是这个原因?
这是我的制作人
public ExpressLightServiceResponse Get(ExpressLightServiceRequest query)
{
var result = new ExpressLightServiceResponse();
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
using (var messageProducer = _messageService.CreateMessageProducer())
{
var message = MessageFactory.Create(newType.CreateInstance());
messageProducer.Publish(message);
}
return result;
}
这是我的消费者
public class ServerAppHost : AppHostHttpListenerBase
{
private readonly string _store;
public string StoreQueue => $"EventA{_store}";
public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly)
{
_store = store;
}
public override void Configure(Container container)
{
container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString));
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
var mi = typeof(Temp).GetMethod("Foo");
var fooRef = mi.MakeGenericMethod(newType);
fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null);
}
}
public class Temp
{
private readonly IRedisClientsManager _redisClientsManager;
public Temp(IRedisClientsManager redisClientsManager)
{
_redisClientsManager = redisClientsManager;
}
public void Foo<T>()
{
var mqService = new RedisMqServer(_redisClientsManager);
mqService.RegisterHandler<T>(DoWork);
mqService.Start();
}
private object DoWork<T>(IMessage<T> arg)
{
//Do work
return null;
}
}
这给了我 Pub/Sub 的灵活性和队列的持久性。有没有人看到/知道实现这一目标的更“原生”方式?
【问题讨论】:
标签: servicestack