【问题标题】:Publishing to local subscribers with NServiceBus使用 NServiceBus 发布到本地订阅者
【发布时间】:2012-04-05 04:30:21
【问题描述】:

我在this 问题中有一个使用 NServiceBus 的用例。本质上,ImportProductCommand 消息有一个消息处理程序。通常,在另一个处理程序接收到指定要导入的产品集的 ImportProductsCommand 后,会将这些消息的集合发送到处理程序。需要知道集合导入的状态,因此使用 saga 来管理导入状态。 ImportProductCommand 的处理程序在完成后发布 ProductImportedEvent 消息和 ProductImportFailedEvent 消息。 saga 使用第一次收到消息时分配给消息的 ID 订阅那些将它们与启动 ImportProductsCommand 消息相关联的消息。

public class ProductImportSaga : NServiceBus.Saga.Saga<ProductImportSagaData>,
    IAmStartedByMessages<ImportProductsCommand>,
    IHandleMessages<IProductImportedEvent>,
    IHandleMessages<IProductImportFailedEvent>
{
    public override void ConfigureHowToFindSaga()
    {
        ConfigureMapping<IProductImportedEvent>(x => x.ImportId, x => x.CorrelationId);
        ConfigureMapping<IProductImportFailedEvent>(x => x.ImportId, x => x.CorrelationId);
    }

    public void Handle(ImportProductsCommand message)
    {
        this.Data.ImportId = message.Id;
        this.Data.Total = message.SKUs.Length;
        foreach (var command in message.SKUs.Select(sku => new ImportProductCommand(message.SupplierId, sku, message.ImportImage, message.Id)))
            this.Bus.SendLocal(command);    // send to local handler shown below
    }

    public void Handle(IProductImportedEvent message)
    {
        this.Data.OnCompleted();
    }

    public void Handle(IProductImportFailedEvent message)
    {
        this.Data.OnFailed();
    }
}

各个 ImportProductCommand 消息的处理程序如下所示:

// handles messages sent by saga above (or sent individually by WCF service)
public class ImportProductHandler : IHandleMessages<ImportProductCommand>
{
    public IBus Bus { get; set; } 

    public void Handle(ImportProductCommand message)
    {
        // importing logic here and upon success:
        if (success)
        {
           this.Bus.Publish(new ProductImportedEvent(correlationId: message.Id));
        }
        else
        {
           this.Bus.Publish(new ProductImportFailedEvent(correlationId: message.Id));
        }
    }
}

这个问题是,当事件消息发布时,它们被放置在与托管单个处理程序和 saga 的进程相关联的队列中。发生这种情况时,队列中可能有很多消息最初由 saga 发送以响应 ImportProductsMessage。这意味着 saga 在处理完排队的 ImportProductCommand 消息之前不会收到这些事件,因此批量导入的处理状态不会及时更新。如果我在不同的进程中托管 saga,那么它将接收消息,而无需等待命令队列处理。有没有办法在同一进程中托管处理程序和传奇时达到相同的效果?基本上,我希望事件消息的处理顺序与 ImportProductCommand 消息不同,即使它们在同一个队列中,这样 saga 可以处理这些事件并相应地更新其状态。这是可能的还是有更好的方法来实现这个结果?我尝试使用 First&lt;T&gt; 指定消息处理程序排序但无济于事,而且为密切相关的逻辑部署两个不同的主机似乎有点矫枉过正。

【问题讨论】:

    标签: nservicebus


    【解决方案1】:

    NSB 没有优先级的概念,因此通常使用另一个端点来执行工作。听起来您所追求的是工作分配,您可能想看看Distributor。在该模型中,Saga 将维护整个工作单元的状态,而每个端点将处理实际处理。如果事情开始变慢,这将允许您动态添加其他端点。

    如果您不想实现完整的 Distributor,那么至少将实际工作推送到另一个端点会减轻对优先级的任何需求。

    【讨论】:

    • 我认为使用单独的端点可能是可行的方法,因为还有另一个要求是支持手动请求的单个导入优先于批量导入命令排队的单个导入。
    • 然后您可以指定其中一个节点仅处理手动导入并通过某些 UI 与其交互。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多