【发布时间】:2016-12-29 07:37:45
【问题描述】:
我一直在尝试制作一个网站演示,该演示使用 MassTransit 和 RabbitMQ 将消息发布到在 Service Fabric 上作为有状态服务运行的服务。
一切顺利,我的客户会发消息:
IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);
await endPoint.Send<ICompanyRequest>(new {CompanyId = id });
我的服务结构服务中的消费者定义如下:
IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
{
h.Username(RabbitMqConstants.UserName);
h.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
{
e.Consumer<PersonInformationConsumer>();
});
});
busControl.Start();
这确实允许我在课堂上使用该消息并且我可以很好地处理它。当我们想要使用 IReliableDictonary 或 IReliableQueue 或任何需要引用从 Service Fabric 服务中的 RunAsync 函数运行的上下文时,就会出现问题。
所以我的问题是,我如何配置(是否有可能)MassTransit 以在了解服务上下文本身的有状态 Service Fabric 服务中工作?
提前非常感谢。 迈克
更新 好的,如果我将注册例程指向我的消息消费者类(例如),我已经在这方面取得了一些进展:
ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);
然后在我的消息的消费者类中,我可以执行以下操作:
internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
private static StatefulServiceContext _currentContext;
#region Constructors
public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
{
_currentContext = serviceContext;
}
public PersonInformationConsumer() : base(_currentContext)
{
}
我现在可以成功调用服务消息了:
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
我现在遇到的问题是试图在 IReliableDictionary 上存储一些东西,这样做会导致“对象引用未设置为对象的实例”错误:( ...任何想法都将不胜感激(尽管可能直到新年了!)
public async Task Consume(ConsumeContext<ICompanyRequest> context)
{
ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);
using (ITransaction tx = StateManager.CreateTransaction())
{
try
{
var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
这是导致错误的原因....帮助! :)
【问题讨论】:
-
我没有安装 Service Fabric SDK,但是看到在异步方法中对 static 类的访问让我停下来,当然还有另一种访问可靠字典的方法实例。
标签: masstransit service-fabric-stateful