【问题标题】:Why am I getting messages in the skipped queue为什么我在跳过的队列中收到消息
【发布时间】:2016-08-26 23:27:04
【问题描述】:

我在 fork/join 配置中有一个 saga 设置。

在 saga 上定义的事件

  • FileMetadataMsg
  • FileReadyMsg
  • SomeOtherMsg

当文件进入单独的侦听器时,进程开始。

  • 发布SagaStart(correlationId)
  • 发布FileSavedToMsg(correlationId, fileLoc)
  • 发布FileMetadataMsg(correlationId, metadata)
  • 发布FileReadyMsg(correlationId, fileLoc)

对文件做一些工作的下游端点

Consumer<FileSavedToMsg>

  • 发布SomeOtherMsg(GotTheFileMsg.correlationId, data)

我在 saga_skipped 队列中获得了 FileSavedToMsg。我只能假设这是由于 FileSavedToMsg 上有一个correlationId,因为 saga 本身没有在其状态机中使用 FileSavedToMsg 并且没有Event<FileSavedToMsg>

如果这就是为什么...我应该在 CorrelationId 以外的字段中传递correlationId,所以传奇看不到它吗?我在某个地方需要它,所以我可以用它标记 SomeOtherMsg。

下面是定义 saga 端点的方式

return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });              

            cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
            {
                epCfg.StateMachineSaga(machine, repository);
            });
});

以下是工作人员端点的定义方式

return Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
        {
            c.PrefetchCount = 1;
            c.Instance(_studyCreatedMsgConsumer);
        });
 });

它们在同一台机器上运行,但在单独的控制台/Topshelf 应用程序中。

【问题讨论】:

  • 你的传奇和你的消费者在同一个队列中吗?它们应该位于具有单独队列的单独接收端点上。很可能你有一个来自FileSavedToMsg exchane 的传奇队列上的绑定导致它。
  • @ChrisPatterson 我已经为 saga 和“worker”添加了端点配置。
  • 你能看看 RabbitMQ 上 study_saga 交换的绑定,看看有哪些交换入站吗?
  • @ChrisPatterson 您的最后一条评论导致了修复。发生的事情是,最初使用 FileSavedToMsg 创建了交换,但经过一些更改后,saga 被更改为使用 FileReadyMsg。该事件已从传奇中删除,我没有意识到它在交易所中仍然存在。我在另一个队列/交换中重用了 FileSavedToMsg。实际上,它搞砸了路由并创建了跳过的队列。如果您将先前的评论移至答案,我将对其进行标记。感谢您的帮助。
  • 希望将来对某人有所帮助。我使用基于 RabbitMQ Web 浏览器的插件来检查并随后删除有问题的交换绑定。如果没有它,我将很难意识到交换仍然绑定到我想要删除的消息类型。

标签: masstransit


【解决方案1】:

如果您在队列上获取的消息未被该接收端点上的消费者消费,则可能是您之前正在消费该消息类型并将其从消费者(或 saga,在您的情况下)中删除,或者您正在出于其他目的使用队列,并且它使用了该消息类型。

无论哪种方式,如果您进入 RabbitMQ 管理控制台并查找队列,则可以展开 Bindings chevron,单击以转到同名交换(这是标准的 MassTransit 约定),然后展开绑定以查看哪些消息类型(命名为 .NET 类型名称的交换)绑定到该交换。

如果您看到一个没有被端点消耗的,那就是罪魁祸首。您可以使用 UI 解绑它,之后发布的消息将不再发送到队列。

【讨论】:

  • 如果我在请求/响应场景中有多个消费者并且请求正在对所有消费者进行扇出怎么办?每个消费者都在响应同一个请求,只有第一个响应有效,所有其他响应(或消息)都存储在 _skipped 队列中。如果消费者正在响应已经得到响应的请求,我该如何避免创建 _skipped 队列?
猜你喜欢
  • 1970-01-01
  • 2012-11-06
  • 2011-09-03
  • 1970-01-01
  • 2019-08-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-29
相关资源
最近更新 更多