【发布时间】:2019-03-08 20:56:03
【问题描述】:
我正在将 Mass Transit 与 RabbitMq 和 Automatonymous 一起使用 一个 asp.net core 2.1 应用程序。我将 EntityFramework 核心与 Postgres 一起用于 持久性。
我想做的是在 向 http rest api 发出请求,并在 saga 完成后返回结果。 我正在做的是:
- 使用具有请求/响应客户端的接口连接一个事件以开始我的传奇
- 在 saga 中发布一条由消费者消费的消息
- 在消费者中发布与我的传奇中的另一个事件相对应的消息
- 在完成并最终确定时从我的传奇中返回响应
这是我的代码:
我的界面
public interface IStartSagaRequest
{
Guid CorrelationId { get; set; }
string Name {get; set;}
}
public interface IStartSagaResponse
{
Guid CorrelationId { get; set; }
bool DidComplete {get; set;}
}
public IDoOperationRequest
{
Guid CorrelationId { get; set; }
}
public IOperationComplete
{
Guid CorrelationId { get; set; }
bool OperationSuccessful {get; set;}
}
我的传奇实例
public class DoOperationSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public Name { get; set; }
public string CurrentState { get; set; }
}
用于在状态机中发布的 IDoOperationRequest 的具体实现
public class DoOperationRequestImpl : IDoOperationRequest
{
public Guid CorrelationId { get; set; }
}
用于在状态机中发布的 IStartSagaResponse 的具体实现
public class StartSagaResponse : IStartSagaResponse
{
public Guid CorrelationId { get; set; }
public bool DidComplete {get; set;}
}
我的状态机
public class ProcessOperationStateMachine : MassTransitStateMachine<DoOperationSaga>
{
public State OperationPending { get; private set; }
public State Complete { get; private set; }
public Event<IOperationComplete> OperationCompleteEvent { get; private set; }
public Event<IStartSagaRequest> StartSagaRequestEvent { get; private set; }
public ProcessOperationStateMachine()
{
InstanceState(doOperationSagaInstance => doOperationSagaInstance.CurrentState);
Event(() => StartSagaRequestEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId).SelectId(c => Guid.NewGuid());
});
Event(() => OperationCompleteEvent, eventConfigurator =>
{
eventConfigurator.CorrelateById(doOperationSaga => doOperationSaga.CorrelationId,
context => context.Message.CorrelationId);
});
Initially(
When(StartSagaRequestEvent)
.Then(context =>
{
context.Instance.CorrelationId = context.Data.CorrelationId;
context.Instance.Name = context.Data.Name;
context.Publish(new DoOperationRequestImpl
{
CorrelationId = context.Data.CorrelationId
});
})
.TransitionTo(OperationPending)
);
During(OperationPending,
When(OperationCompleteEvent)
.Then(context =>
{
// I'm just doing this for debugging
context.Instance.Name = "changed in operationComplete";
})
.ThenAsync(context => context.RespondAsync(new StartSagaResponse
{
CorrelationId = context.Data.CorrelationId,
DidComplete = true
}))
.Finalize());
}
我的消费者:
public class DoOperationRequestConsumer : IConsumer<ISBDoOperationRequest>
{
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
await context.Publish<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
}
我如何在 Startup.cs 中的 DI 中进行连接
public void ConfigureServices(IServiceCollection services)
{
stateMachine = new ProcessOperationStateMachine();
SagaDbContextFactory factory = new SagaDbContextFactory();
EntityFrameworkSagaRepository<DoOperationSaga> repository = new EntityFrameworkSagaRepository<DoOperationSaga>(factory);
services.AddMassTransit(x =>
{
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(sbc =>
{
IRabbitMqHost host = sbc.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
sbc.ReceiveEndpoint(host, "do-operation", ep =>
{
ep.UseMessageRetry(c => c.Interval(2, 100));
ep.StateMachineSaga(stateMachine, repository);
ep.Durable = false;
});
sbc.ReceiveEndpoint(host, "consumer-queue", ep =>
{
ep.Consumer(() => new DoOperationRequestConsumer());
ep.Durable = false;
});
}));
x.AddConsumer<DoOperationRequestConsumer>();
});
services.AddScoped<DoOperationRequestConsumer>();
services.AddScoped(p =>
p.GetRequiredService<IBusControl>()
.CreateRequestClient<IDoOperationRequest, IDoOperationResponse>(
new Uri("rabbitmq://localhost/do-operation?durable=false"),
TimeSpan.FromSeconds(30)));
}
并在我的控制器中发出请求:
public IRequestClient<IDoOperationRequest, IDoOperationResponse> _doOperationClient { get; set; }
...
var guid = Guid.NewGuid();
_doOperationClient.Request(new
{
Name = "from the controller",
CorrelationId = guid
});
我看到的是我的状态机确实启动了。 When(StartSagaRequestEvent) 确实受到打击 并且发布了 DoOperationRequest 消息。 DoOperationRequestConsumer 确实收到了消息 并发布 IOperationComplete 消息。然而,这就是它停止的地方。我的 IOperationCompleteEvent 在我的状态机中没有被调用。当我查看数据库时,我可以看到我的 saga 实例获取 使用 guid 创建并且 CurrentState 设置为 OperationPending。当我看着我的rabbitmq 管理仪表板我看到一条消息在我的 DoOperationRequestConsumer 完成后发布 IOperationComplete 消息发布。我只是没有看到状态机消耗 IOperationComplete 消息由消费者发布。当我设置断点并检查消费者中的消息时 我确实看到 CorrelationId 设置为与传奇的 CorrelationId 相同的值。
我还尝试在 消费者:
public async Task Consume(ConsumeContext<ISBDoOperationRequest> context)
{
ISendEndpoint sendEndpoint = await context.GetSendEndpoint(new Uri("rabbitmq://localhost/do-operation?durable=false"));
await sendEndpoint.Send<IOperationComplete>(new
{
CorrelationId = context.Message.CorrelationId,
OperationSuccessful = true
});
}
但仍然无法建立连接。
我整天都在努力解决这个问题,不知道是什么 我在这里失踪了。如果有人可以就我可能做错的事情给我一些建议,我将不胜感激 它,再次为文字墙感到抱歉,我知道这是分配阅读,但我想清楚我在做什么。 非常感谢!
【问题讨论】:
标签: c# asp.net-core masstransit automatonymous