【发布时间】:2012-04-05 04:30:21
【问题描述】:
我在this 问题中有一个使用 NServiceBus 的用例。本质上,ImportProductCommand 消息有一个消息处理程序。通常,在另一个处理程序接收到指定要导入的产品集的 ImportProductsCommand 后,会将这些消息的集合发送到处理程序。需要知道集合导入的状态,因此使用 saga 来管理导入状态。 ImportProductCommand 的处理程序在完成后发布 ProductImportedEvent 消息和 ProductImportFailedEvent 消息。 saga 使用第一次收到消息时分配给消息的 ID 订阅那些将它们与启动 ImportProductsCommand 消息相关联的消息。
public class ProductImportSaga : NServiceBus.Saga.Saga<ProductImportSagaData>,
IAmStartedByMessages<ImportProductsCommand>,
IHandleMessages<IProductImportedEvent>,
IHandleMessages<IProductImportFailedEvent>
{
public override void ConfigureHowToFindSaga()
{
ConfigureMapping<IProductImportedEvent>(x => x.ImportId, x => x.CorrelationId);
ConfigureMapping<IProductImportFailedEvent>(x => x.ImportId, x => x.CorrelationId);
}
public void Handle(ImportProductsCommand message)
{
this.Data.ImportId = message.Id;
this.Data.Total = message.SKUs.Length;
foreach (var command in message.SKUs.Select(sku => new ImportProductCommand(message.SupplierId, sku, message.ImportImage, message.Id)))
this.Bus.SendLocal(command); // send to local handler shown below
}
public void Handle(IProductImportedEvent message)
{
this.Data.OnCompleted();
}
public void Handle(IProductImportFailedEvent message)
{
this.Data.OnFailed();
}
}
各个 ImportProductCommand 消息的处理程序如下所示:
// handles messages sent by saga above (or sent individually by WCF service)
public class ImportProductHandler : IHandleMessages<ImportProductCommand>
{
public IBus Bus { get; set; }
public void Handle(ImportProductCommand message)
{
// importing logic here and upon success:
if (success)
{
this.Bus.Publish(new ProductImportedEvent(correlationId: message.Id));
}
else
{
this.Bus.Publish(new ProductImportFailedEvent(correlationId: message.Id));
}
}
}
这个问题是,当事件消息发布时,它们被放置在与托管单个处理程序和 saga 的进程相关联的队列中。发生这种情况时,队列中可能有很多消息最初由 saga 发送以响应 ImportProductsMessage。这意味着 saga 在处理完排队的 ImportProductCommand 消息之前不会收到这些事件,因此批量导入的处理状态不会及时更新。如果我在不同的进程中托管 saga,那么它将接收消息,而无需等待命令队列处理。有没有办法在同一进程中托管处理程序和传奇时达到相同的效果?基本上,我希望事件消息的处理顺序与 ImportProductCommand 消息不同,即使它们在同一个队列中,这样 saga 可以处理这些事件并相应地更新其状态。这是可能的还是有更好的方法来实现这个结果?我尝试使用 First<T> 指定消息处理程序排序但无济于事,而且为密切相关的逻辑部署两个不同的主机似乎有点矫枉过正。
【问题讨论】:
标签: nservicebus