【问题标题】:MassTransit And Service Fabric Stateful Service?MassTransit 和 Service Fabric 状态服务?
【发布时间】:2016-12-29 07:37:45
【问题描述】:

我一直在尝试制作一个网站演示,该演示使用 MassTransit 和 RabbitMQ 将消息发布到在 Service Fabric 上作为有状态服务运行的服务。

一切顺利,我的客户会发消息:

IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);

await endPoint.Send<ICompanyRequest>(new {CompanyId = id });

我的服务结构服务中的消费者定义如下:

        IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
            {
                h.Username(RabbitMqConstants.UserName);
                h.Password(RabbitMqConstants.Password);
            });

            cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
            {
                e.Consumer<PersonInformationConsumer>();
            });

        });

        busControl.Start();

这确实允许我在课堂上使用该消息并且我可以很好地处理它。当我们想要使用 IReliableDictonary 或 IReliableQueue 或任何需要引用从 Service Fabric 服务中的 RunAsync 函数运行的上下文时,就会出现问题。

所以我的问题是,我如何配置(是否有可能)MassTransit 以在了解服务上下文本身的有状态 Service Fabric 服务中工作?

提前非常感谢。 迈克

更新 好的,如果我将注册例程指向我的消息消费者类(例如),我已经在这方面取得了一些进展:

ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
                ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);

然后在我的消息的消费者类中,我可以执行以下操作:

    internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
    private static StatefulServiceContext _currentContext;

    #region Constructors

    public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
    {
        _currentContext = serviceContext;
    }

    public PersonInformationConsumer() : base(_currentContext)
    {
    }

我现在可以成功调用服务消息了:

ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);

我现在遇到的问题是试图在 IReliableDictionary 上存储一些东西,这样做会导致“对象引用未设置为对象的实例”错误:( ...任何想法都将不胜感激(尽管可能直到新年了!)

        public async Task Consume(ConsumeContext<ICompanyRequest> context)
    {

        ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);

        using (ITransaction tx = StateManager.CreateTransaction())
        {

            try
            {

                var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");

这是导致错误的原因....帮助! :)

【问题讨论】:

  • 我没有安装 Service Fabric SDK,但是看到在异步方法中对 static 类的访问让我停下来,当然还有另一种访问可靠字典的方法实例。

标签: masstransit service-fabric-stateful


【解决方案1】:

您需要做更多工作才能使 MassTransit 和有状态服务协同工作,这里有一些问题需要您自己关注。

只有有状态分区中的主节点(n 个分区中的 n 个主节点)才能写入/更新有状态服务,所有副本在尝试写回任何状态时都会抛出异常。所以你需要处理这个问题,从表面上看,这听起来很容易,直到你考虑到由于重新平衡集群,master 可以在集群中移动,一般服务结构应用程序的默认设置是关闭在副本上进行处理,并且只在主服务器上运行工作。这都是通过 RunAsync 方法完成的(试一试,在 RunAsync 方法中运行 3 个有状态的服务,然后终止 master)。

还需要考虑对数据进行分区,由于有状态服务随分区扩展,因此您需要创建一种方法来将数据分发到服务总线上的单独端点,也许有一个单独的队列,只监听一个给定分区范围?假设您有一条 UserCreated 消息,您可以将其拆分为 country UK 转到分区 1,US 转到分区 2 等等...

如果您只是想让一些基本的东西启动并运行,我会将其限制在一个分区中,然后尝试将您的总线创建放在 RunAsync 中,并在取消令牌请求取消时关闭总线。

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
        {
            h.Username(RabbitMqConstants.UserName);
            h.Password(RabbitMqConstants.Password);
        });

        cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
        {
            // Pass in the stateful service context
            e.Consumer(c => new PersonInformationConsumer(Context));
        });

    });

    busControl.Start();

    while (true)
    {
        if(cancellationToken.CancellationRequested)
        {
            //Service Fabric wants us to stop
            busControl.Stop();

            cancellationToken.ThrowIfCancellationRequested();
        }

        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2016-11-16
  • 2017-06-22
  • 2017-12-13
  • 2017-03-07
  • 2018-04-09
  • 2018-03-26
  • 2019-09-10
  • 2017-06-26
相关资源
最近更新 更多