【问题标题】:Using saga event to react to a a message published in a consumer使用 saga 事件对消费者中发布的消息做出反应
【发布时间】:2019-03-08 20:56:03
【问题描述】:

我正在将 Mass Transit 与 RabbitMq 和 Automatonymous 一起使用 一个 asp.net core 2.1 应用程序。我将 EntityFramework 核心与 Postgres 一起用于 持久性。

我想做的是在 向 http rest api 发出请求,并在 saga 完成后返回结果。 我正在做的是:

  • 使用具有请求/响应客户端的接口连接一个事件以开始我的传奇
  • 在 saga 中发布一条由消费者消费的消息
  • 在消费者中发布与我的传奇中的另一个事件相对应的消息
  • 在完成并最终确定时从我的传奇中返回响应

这是我的代码:

我的界面

public interface IStartSagaRequest
{
    Guid CorrelationId { get; set; }
    string Name {get; set;}
}

public interface IStartSagaResponse
{
    Guid CorrelationId { get; set; }
    bool DidComplete {get; set;}
}

public IDoOperationRequest
{
    Guid CorrelationId { get; set; }
}

public IOperationComplete
{
    Guid CorrelationId { get; set; }
    bool OperationSuccessful {get; set;}
}

我的传奇实例

public class DoOperationSaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public Name { get; set; }
    public string CurrentState { get; set; }
}

用于在状态机中发布的 IDoOperationRequest 的具体实现

public class DoOperationRequestImpl : IDoOperationRequest
{
    public Guid CorrelationId { get; set; }
}

用于在状态机中发布的 IStartSagaResponse 的具体实现

public class StartSagaResponse : IStartSagaResponse
{
    public Guid CorrelationId { get; set; }
    public bool DidComplete {get; set;}
}

我的状态机

public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
    public State OperationPending { get; private set; }
    public State Complete { get; private set; }


    public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
    public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }


    public ProcessOperationStateMachine()
    {
        InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);

        Event(() => StartSagaRequestEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                    context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
        });

        Event(() => OperationCompleteEvent, eventConfigurator =>
        {
            eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
                context => context.Message.CorrelationId);
        });


        Initially(
            When(StartSagaRequestEvent)
                .Then(context =>
                {
                    context.Instance.CorrelationId = context.Data.CorrelationId;
                    context.Instance.Name = context.Data.Name;
                    context.Publish(new DoOperationRequestImpl
                    {
                        CorrelationId = context.Data.CorrelationId
                    });

                })
                .TransitionTo(OperationPending)
        );

        During(OperationPending,
            When(OperationCompleteEvent)
                .Then(context =>
                {
                    // I'm just doing this for debugging
                    context.Instance.Name = "changed in operationComplete";
                })
                .ThenAsync(context => context.RespondAsync(new StartSagaResponse 
                { 
                    CorrelationId = context.Data.CorrelationId,
                    DidComplete = true
                }))
                .Finalize());

}

我的消费者:

public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{

    public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
    {
       await context.Publish<IOperationComplete>(new
       {
          CorrelationId = context.Message.CorrelationId,
          OperationSuccessful = true
       });
    }
}

我如何在 Startup.cs 中的 DI 中进行连接

public void ConfigureServices(IServiceCollection services)
{
    stateMachine = new ProcessOperationStateMachine();

    SagaDbContextFactory factory = new SagaDbContextFactory();
    EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);

    services.AddMassTransit(x =>
    {

        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
        {
            IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            sbc.ReceiveEndpoint(host, "do-operation", ep =>
            {
                ep.UseMessageRetry(c => c.Interval(2, 100));
                ep.StateMachineSaga(stateMachine, repository);
                ep.Durable = false;
            });

            sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
            {
                ep.Consumer(() => new DoOperationRequestConsumer());
                ep.Durable = false;
            });
        }));
        x.AddConsumer<DoOperationRequestConsumer>();
    });

    services.AddScoped<DoOperationRequestConsumer>();

    services.AddScoped(p =>
        p.GetRequiredService<IBusControl>()
            .CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
                new Uri("rabbitmq://localhost/do-operation?durable=false"),
                TimeSpan.FromSeconds(30)));

}

并在我的控制器中发出请求:

public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
    Name = "from the controller",
    CorrelationId = guid
});

我看到的是我的状态机确实启动了。 When(StartSagaRequestEvent) 确实受到打击 并且发布了 DoOperationRequest 消息。 DoOperationRequestConsumer 确实收到了消息 并发布 IOperationComplete 消息。然而,这就是它停止的地方。我的 IOperationCompleteEvent 在我的状态机中没有被调用。当我查看数据库时,我可以看到我的 saga 实例获取 使用 guid 创建并且 CurrentState 设置为 OperationPending。当我看着我的rabbitmq 管理仪表板我看到一条消息在我的 DoOperationRequestConsumer 完成后发布 IOperationComplete 消息发布。我只是没有看到状态机消耗 IOperationComplete 消息由消费者发布。当我设置断点并检查消费者中的消息时 我确实看到 CorrelationId 设置为与传奇的 CorrelationId 相同的值。

我还尝试在 消费者:

public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
    ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));

    await sendEndpoint.Send<IOperationComplete>(new
    {
      CorrelationId = context.Message.CorrelationId,
      OperationSuccessful = true
    });
}

但仍然无法建立连接。

我整天都在努力解决这个问题,不知道是什么 我在这里失踪了。如果有人可以就我可能做错的事情给我一些建议,我将不胜感激 它,再次为文字墙感到抱歉,我知道这是分配阅读,但我想清楚我在做什么。 非常感谢!

【问题讨论】:

    标签: c# asp.net-core masstransit automatonymous


    【解决方案1】:

    你的事件correlationId似乎有问题,应该是这样的:

    Event(() => StartSagaRequestEvent, eventConfigurator =>
    {
        eventConfigurator.CorrelateById(context => context.Message.CorrelationId)
            .SelectId(context => context.Message.CorrelationId);
    });
    

    这样它就会初始化为消息的 CorrelationId。

    不相关,但您的端点应该为您的容器使用扩展方法:

    sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
    {
        ep.ConfigureConsumer<DoOperationRequestConsumer>();
        ep.Durable = false;
    });
    

    并使用新的请求客户端,也可以在扩展中进行配置。

    x.AddRequestClient<IDoOperationRequest>(new Uri("rabbitmq://localhost/do-operation?durable=false"));
    

    此外,在您的初始条件下,应删除此行:

    context.Instance.CorrelationId = context.Data.CorrelationId;
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-11-24
      • 1970-01-01
      • 2020-03-09
      • 1970-01-01
      • 2021-05-08
      • 1970-01-01
      • 2020-07-17
      相关资源
      最近更新 更多