【问题标题】:Rebus / Content based routingRebus / 基于内容的路由
【发布时间】:2016-11-23 01:11:56
【问题描述】:

如您所见,有一个总部作为根节点,一些分支作为子节点。有Data类型的消息,我想根据Data对象的内容发布消息,例如:

if (data.value == xxxx) publish(data, Br1, Br2)
else if (data.value == yyyy) publish(data, Br3, Br4)
else if (data.value == zzzz) publis(data, Br5, Br6)

这是 pub/sub 模式的某种定制版本。但是我想根据消息的内容将数据类型的消息发布给一些特殊的订阅者。

Rebus 有解决方案吗?

【问题讨论】:

    标签: rebus


    【解决方案1】:

    Rebus 中有几种解决方案 :)

    对于您的场景,我可以看到两种解决方法:1)使用自定义主题,或 2)实现真正的基于内容的路由器。

    如果有意义,您可以使用 Rebus 的主题 API 来处理路由,从而使用主题对这个发布/订阅场景进行建模。如果您可以说您的每条数据消息都属于某个类别,那么您的订阅者可以订阅这些类别,这是有道理的。

    与“真实”的基于主题的排队系统相比,例如RabbitMQ,Rebus 中的topics API 很粗糙。它不允许使用通配符 (*) 或任何类似的高级内容 - 主题只是简单的字符串,您可以订阅这些字符串,然后将其用作发布/订阅频道以将事件路由到多个订阅者。

    订阅者端可以这样使用:

    await bus.Advanced.Topics.Subscribe("department_a");
    

    然后在发布者的最后:

    var data = new Data(...);
    
    await bus.Advanced.Topics.Publish("department_a", data);
    

    如果还是不行,您可以插入一个“真正的”基于内容的路由器,它只是您await bus.Send(eachDataMessage) 的端点,然后将消息转发给相关订阅者。

    根据您的要求,可以使用 Rebus 在两个级别上完成。如果查看消息的标头就足够了,您应该将其实现为“传输消息转发器”,因为它会跳过反序列化并提供一个很好的 API 来简单地转发消息:

    Configure.With(...)
        .Transport(t => t.UseMsmq("router"))
        .Routing(r => {
            r.AddTransportMessageForwarder(async transportMessage => {
                var headers = transportMessage.Headers;
    
                var subscribers = Decide(headers);
    
                return ForwardAction.ForwardTo(subscribers);
            });
        })
        .Start();
    

    如果你需要查看实际的消息,你应该只实现一个普通的消息处理程序,然后使用总线转发消息:

    public class Router : IHandleMessages<Data>
    {
        readonly IBus _bus;
    
        public Router(IBus bus)
        {
            _bus = bus;
        }   
    
        public async Task Handle(Data message)
        {
            var subscribers = Decide(message);
    
            foreach(var subscriber in subscribers)
            {
                await _bus.Advanced.TransportMessage.ForwardTo(subscriber);
            }
        }
    }
    

    自定义实现的路由器是最灵活的解决方案,因为您可以实现任何您喜欢的逻辑,但正如您所见,它涉及的内容稍微多一些。


    (*) Rebus 通常不允许使用通配符,尽管它确实将主题直接传递给 RabbitMQ,如果你碰巧使用它作为传输,这意味着你实际上可以充分利用 RabbitMQ(有关更多信息,请参阅 this issue

    【讨论】:

    • 谢谢。关于最后一个解决方案(路由器/处理程序),我编写了这些代码行,但是在转发一些数据包对象后,处理程序出现异常。 (总线为空)
    【解决方案2】:
        static void Main()
        {
    
            using (var activator = new BuiltinHandlerActivator())
            {
                activator.Handle<Packet>(async (bus, packet) =>
                {
                    string subscriber = "subscriberA";
                    await bus.Advanced.TransportMessage.Forward(subscriber); 
                });
    
                Configure.With(activator)
                    .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
                    .Transport(t => t.UseMsmq("router"))
                    .Start();
    
                for (int i = 0; i < 10; i++)
                {
                    activator.Bus.SendLocal(
                        new Packet()
                        {
                            ID = i,
                            Content = "content" + i.ToString(),
                            Sent = false,
                        }).Wait();
                }
            }
    
            Console.ReadLine();
        }
    

    【讨论】:

    • 你会得到一个NullReferenceException,因为你处理BuiltinHandlerActivator 太早了——如果你在for-loop 存在之后Console.ReadLine();,你会得到另一个错误,说明队列@ 987654327@不存在
    • 愚蠢的我。谢谢。 :)
    • 是的 :) 你可以阅读它here on the wiki page about transactions
    • 非常感谢您的帮助。 :) 最后一条评论中的代码是我的订阅方。我的订阅者在事务范围内使用消息,因为它处理数据库。从发布者发送的数据包必须保存在订阅者数据库中。一切正常。但有时,我手动终止订阅者控制台应用程序,然后再次运行它(模拟消费者应用程序中的致命错误),重复此操作后,引发 PK_Violation 错误,表示该消息在下一次运行之前和下一次运行时已处理,它引发异常.我错过了什么吗?
    【解决方案3】:
    using (var trScope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
       scope.EnlistRebus();
       Packet packet = ReadFromDB()
       activator.Bus.SendLocal(packet).Wait()
       scope.Complete()
    }
    
    
    activator.Handle<Packet>(async (bus, packet) =>
    {
       string subscriber = "subscriberA";
       await bus.Advanced.TransportMessage.Forward(subscriber); 
    });
    

    【讨论】:

      【解决方案4】:
              using (var activator = new BuiltinHandlerActivator())
              {
                  activator.Handle<Packet>(async message =>
                  {
                      string connectionString =
                          "Data Source=.;Initial Catalog=Rebus;User ID=sa;Password=123456";
      
                      using (SqlConnection connection = new SqlConnection(connectionString))
                      {
                          string queryString = @"INSERT INTO CLIENTPACKET(ID, CONTENT, SENT) VALUES(@id, @content, @sent)";
                          connection.Open();
      
                          using (SqlCommand command = new SqlCommand(queryString, connection))
                          {
                              command.Parameters.Add(new SqlParameter("@id", message.ID));
                              command.Parameters.Add(new SqlParameter("@content", message.Content));
                              command.Parameters.Add(new SqlParameter("@sent", message.Sent));
      
                              await command.ExecuteNonQueryAsync();
                          }
                      }
                  });
      
      
                  Configure.With(activator)
                      .Logging(l => l.ColoredConsole(minLevel: LogLevel.Warn))
                      .Transport(t => t.UseMsmq(@"subscriberA"))
                      .Routing(r => r.TypeBased().MapAssemblyOf<Packet>("router"))
                      .Options(o =>
                      {
                          TransactionOptions tranOp = new TransactionOptions();
                          tranOp.IsolationLevel = IsolationLevel.ReadCommitted;
                          o.HandleMessagesInsideTransactionScope(tranOp);
      
                          o.SetNumberOfWorkers(2);
                          o.SetMaxParallelism(2);
                      })
                      .Start();
      
                  activator.Bus.Subscribe<Packet>().Wait();
      
                  Console.WriteLine("Press ENTER to quit");
                  Console.ReadLine();
              }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-11-26
        • 1970-01-01
        • 2017-05-05
        • 2021-12-06
        • 2021-03-26
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多