【发布时间】:2016-08-26 23:27:04
【问题描述】:
我在 fork/join 配置中有一个 saga 设置。
在 saga 上定义的事件
FileMetadataMsgFileReadyMsgSomeOtherMsg
当文件进入单独的侦听器时,进程开始。
- 发布
SagaStart(correlationId) - 发布
FileSavedToMsg(correlationId, fileLoc) - 发布
FileMetadataMsg(correlationId, metadata) - 发布
FileReadyMsg(correlationId, fileLoc)
对文件做一些工作的下游端点
Consumer<FileSavedToMsg>
- 发布
SomeOtherMsg(GotTheFileMsg.correlationId, data)
我在 saga_skipped 队列中获得了 FileSavedToMsg。我只能假设这是由于 FileSavedToMsg 上有一个correlationId,因为 saga 本身没有在其状态机中使用 FileSavedToMsg 并且没有Event<FileSavedToMsg>。
如果这就是为什么...我应该在 CorrelationId 以外的字段中传递correlationId,所以传奇看不到它吗?我在某个地方需要它,所以我可以用它标记 SomeOtherMsg。
下面是定义 saga 端点的方式
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
{
epCfg.StateMachineSaga(machine, repository);
});
});
以下是工作人员端点的定义方式
return Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
{
c.PrefetchCount = 1;
c.Instance(_studyCreatedMsgConsumer);
});
});
它们在同一台机器上运行,但在单独的控制台/Topshelf 应用程序中。
【问题讨论】:
-
你的传奇和你的消费者在同一个队列中吗?它们应该位于具有单独队列的单独接收端点上。很可能你有一个来自
FileSavedToMsgexchane 的传奇队列上的绑定导致它。 -
@ChrisPatterson 我已经为 saga 和“worker”添加了端点配置。
-
你能看看 RabbitMQ 上
study_saga交换的绑定,看看有哪些交换入站吗? -
@ChrisPatterson 您的最后一条评论导致了修复。发生的事情是,最初使用 FileSavedToMsg 创建了交换,但经过一些更改后,saga 被更改为使用 FileReadyMsg。该事件已从传奇中删除,我没有意识到它在交易所中仍然存在。我在另一个队列/交换中重用了 FileSavedToMsg。实际上,它搞砸了路由并创建了跳过的队列。如果您将先前的评论移至答案,我将对其进行标记。感谢您的帮助。
-
希望将来对某人有所帮助。我使用基于 RabbitMQ Web 浏览器的插件来检查并随后删除有问题的交换绑定。如果没有它,我将很难意识到交换仍然绑定到我想要删除的消息类型。
标签: masstransit