Rebus 中有几种解决方案 :)
对于您的场景,我可以看到两种解决方法:1)使用自定义主题,或 2)实现真正的基于内容的路由器。
如果有意义,您可以使用 Rebus 的主题 API 来处理路由,从而使用主题对这个发布/订阅场景进行建模。如果您可以说您的每条数据消息都属于某个类别,那么您的订阅者可以订阅这些类别,这是有道理的。
与“真实”的基于主题的排队系统相比,例如RabbitMQ,Rebus 中的topics API 很粗糙。它不允许使用通配符 (*) 或任何类似的高级内容 - 主题只是简单的字符串,您可以订阅这些字符串,然后将其用作发布/订阅频道以将事件路由到多个订阅者。
订阅者端可以这样使用:
await bus.Advanced.Topics.Subscribe("department_a");
然后在发布者的最后:
var data = new Data(...);
await bus.Advanced.Topics.Publish("department_a", data);
如果还是不行,您可以插入一个“真正的”基于内容的路由器,它只是您await bus.Send(eachDataMessage) 的端点,然后将消息转发给相关订阅者。
根据您的要求,可以使用 Rebus 在两个级别上完成。如果查看消息的标头就足够了,您应该将其实现为“传输消息转发器”,因为它会跳过反序列化并提供一个很好的 API 来简单地转发消息:
Configure.With(...)
.Transport(t => t.UseMsmq("router"))
.Routing(r => {
r.AddTransportMessageForwarder(async transportMessage => {
var headers = transportMessage.Headers;
var subscribers = Decide(headers);
return ForwardAction.ForwardTo(subscribers);
});
})
.Start();
如果你需要查看实际的消息,你应该只实现一个普通的消息处理程序,然后使用总线转发消息:
public class Router : IHandleMessages<Data>
{
readonly IBus _bus;
public Router(IBus bus)
{
_bus = bus;
}
public async Task Handle(Data message)
{
var subscribers = Decide(message);
foreach(var subscriber in subscribers)
{
await _bus.Advanced.TransportMessage.ForwardTo(subscriber);
}
}
}
自定义实现的路由器是最灵活的解决方案,因为您可以实现任何您喜欢的逻辑,但正如您所见,它涉及的内容稍微多一些。
(*) Rebus 通常不允许使用通配符,尽管它确实将主题直接传递给 RabbitMQ,如果你碰巧使用它作为传输,这意味着你实际上可以充分利用 RabbitMQ(有关更多信息,请参阅 this issue)