【问题标题】:MassTransit: How to stop the bus after all messages are consumed by consumer?MassTransit:在消费者消费完所有消息后如何停止公共汽车?
【发布时间】:2019-09-25 21:44:56
【问题描述】:

我尝试在使用 MassTransit 消耗队列中的所有消息后停止总线。我将并发消息限制设置为 1,因为我的消费者需要一次处理一条消息。

我尝试将 bus.StopAsync() 放在 bus.StartAsync 后面,如下所示。结果显示,一条消息被消费后,总线就会停止。

总线配置:

IBusControl bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host(new Uri("rabbitmq://localhost"), hostConfigurator =>
                {
                    hostConfigurator.Username("username");
                    hostConfigurator.Password("password");
                });

            cfg.ReceiveEndpoint(host, "MyResult", ep =>
            {
                ep.Bind("MyExchange", s => { s.Durable = true; });

                ep.Consumer<MessageConsumer>(mc =>
                {
                    mc.UseConcurrentMessageLimit(1);
                });
            });
        });

公交车起停:

await bus.StartAsync();

await bus.StopAsync();

我的问题是如何在队列中的所有消息都被消耗后停止总线。我对 MassTransit 很陌生,对调用消费者和停止公共汽车的顺序非常好奇。感谢有人可以提供帮助。谢谢。

【问题讨论】:

    标签: masstransit


    【解决方案1】:

    在 Testing 命名空间中,有一个功能用于监视总线上的活动,可用于发出没有消息被消费的信号(之后,您可以按照您的建议停止总线)。

    你可以看到单元测试: https://github.com/MassTransit/MassTransit/blob/v7.0.3/tests/MassTransit.Tests/BusActivityMonitor_Specs.cs#L53

    观察者被添加使用:

    var activityMonitor = bus.CreateBusActivityMonitor(TimeSpan.FromMilliseconds(500));
    

    一旦总线空闲,超时就会为真:

    var timeout = await activityMonitor.AwaitBusInactivity(TimeSpan.FromSeconds(10));
    

    如果timeout 为真,则在指定时间内总线上没有活动。

    【讨论】:

      猜你喜欢
      • 2017-12-21
      • 2017-09-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多