【问题标题】:MassTransit pub/sub with MSMQ using使用 MSMQ 的 MassTransit 发布/订阅
【发布时间】:2015-03-29 15:38:31
【问题描述】:

我刚开始使用 MassTransit,找不到适合初学者的任何合适的文档。 我确实在http://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/ 找到了一些示例代码,它显示了一个使用 RabbitMQ 的简单 MassTransit 发布/订阅示例

但对于我的公司,我需要使用 MSMQ。 所以我删除了 RabbitMQ 引用:

x.UseRabbitMq();
x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName);

并将它们改为使用 MSMQ:

x.UseMsmq();
x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);

我在启动订阅者和发布者时都没有收到错误,我可以在发布者处输入消息,但它们似乎没有到达订阅者,消费代码从未被调用。

配置:

namespace Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        x.UseMsmq();
        x.ReceiveFrom("msmq://localhost/MtPubSubExample_" + queueName);
        moreInitialization(x);
      });

      return bus;
    }
  }
}

出版商:

static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage() { What = text, When = DateTime.Now };
        bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
      }

      bus.Dispose();
    }

订阅者:

static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestSubscriber", x =>
      {
        x.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }

未被调用的消费者代码:

class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.Write("TXT: " + message.Message.What);
      Console.Write("  SENT: " + message.Message.When.ToString());
      Console.Write("  PROCESSED: " + DateTime.Now.ToString());
      Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");
    }
  }

我还以为我可以看到存储在 msmq 中的消息,但是私有队列是空的。

我现在头疼了 2 天,一定是遗漏了一些明显的东西;非常感谢任何帮助。

我的环境:Windows 8.1 Prof. 与 VS 2013 Prof.

【问题讨论】:

  • 你能显示moreInitialization(x);中的内容吗?
  • 您好,谢谢,仅适用于订阅者,在 x => 之后: var bus = BusInitializer.CreateBus("TestSubscriber", x => { x.Subscribe(subs => { subs.Consumer().Permanent(); });

标签: msmq masstransit


【解决方案1】:

缺少的部分是订阅分发 - 这是跟踪哪些消费者注册到哪些消息并路由消息的组件。

正如docs 所说:

在本地总线上创建订阅后,此信息随后 需要在您的所有不同总线实例之间共享 应用网络。

虽然路由数据相同,但这些信息如何到达所有 节点的数量因您的传输配置而异。

正如页面继续解释的那样,对于 MSMQ,您有两个选择:

  1. MSMQ 多播
  2. UseSubscriptionService (MassTransit.RuntimeServices)

多播是only really meant for development and not for production。请注意,没有永久订阅和messages can be lost during startup

订阅服务(又名 MassTransit.RuntimeServices)是 not distributed with NuGet,因此您需要找到二进制文件或从源代码编译。它作为 Windows 服务运行,并且需要一个数据库来跟踪订阅。

我相信在运行 RabbitMQ 或 Azure 服务总线时这些都不需要,它们是即将推出的 3.0 版支持的传输(Rabbit MQ 在 2.x 的最新版本中比 MSMQ 更受支持,并且版本 3 放弃了对 MSMQ 的支持)

【讨论】:

    猜你喜欢
    • 2011-11-02
    • 1970-01-01
    • 1970-01-01
    • 2011-03-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多