【问题标题】:Trying to get the message from the MassTransit consumer试图从 MassTransit 消费者那里获取消息
【发布时间】:2017-01-19 15:50:06
【问题描述】:

我正在使用 Masstransit 和 RabbitMQ 发布事件(没有消费者,仅使用发布者),目前我正在尝试创建一个集成测试来验证消息是否已发布,如果是,我想检查它是否是正确的消息。为此,我创建了一个消费者来使用队列中的消息并将其与我的预期进行比较。这里的问题是,我无法使用该消息。活动已成功发布,但我无法收到消息。

这是负责连接消费者的类

public class ServiceBusHelper
{

    private IBusControl bus;
    private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"];
    private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"];
    private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"];
    private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"];

    public ConnectHandle HandleObserver { get; set; }

    public void ConnectRabbitMQ()
    {
        bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(
                new Uri(serviceBusEndpoint),
                h =>
                {
                    h.Username(serviceBusUsername);
                    h.Password(serviceBusPassword);
                });

            cfg.ReceiveEndpoint(
            serviceBusQueueName,
            e =>
            {

                e.Consumer<ServiceBusEventsHelper>();

            });
        });

        //Observer
        var observer = new PublishedObserverHelper();
        HandleObserver = bus.ConnectPublishObserver(observer);

    }


}

这是消费消息的类

public class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent>
{
    public ITransportCreatedEvent Result { get; set; }


    public async Task Consume(ConsumeContext<ITransportCreatedEvent> context)
    {
        Result = await Task.FromResult(context.Message);

    }


}

在测试方法中我有这个

            ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper();
        ServiceBusHelper busHelper = new ServiceBusHelper();

        try
        {
            busHelper.ConnectRabbitMQ();
            transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest);
            handle = busHelper.HandleObserver;

            var eventResponse = eventHelper.Result;// Allways NULL
        }
        catch (Exception ex)
        {
            Assert.Fail(ex.GetDetailMessage());
        }

我正在尝试获取这样的消息结果

var eventResponse = eventHelper.Result;// Allways NULL

但始终为空。

有人可以帮帮我吗?

我有一个服务,其中一个方法是 CreateTransportAsync(),在该方法中我调用了 Publish

public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){

    .
    .
    .
    await this.RaiseTransportCreatedEvent(transportResponseDto);
    }

    private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto)
    {
        var evt = CreateTransportEvent(transportResponseDto);
        await this.serviceBus.Publish(evt).ConfigureAwait(false);
    }

public class ServiceBus<T> : IServiceBus<T> where T : class
{
    private readonly IBus bus;

    public ServiceBus(IBus bus)
    {
        this.bus = bus;
    }

    public Task Publish(T evt)
    {
        return bus.Publish(evt, evt.GetType());
    }
}

这就是我发布活动的方式,并且它有效。现在我正在尝试测试所有这些是否适用于另一个解决方案中的集成测试,我正在尝试创建一个消费者来使用队列中的消息。然后,我想验证该消息(将其与 json 文件中的假消息进行比较)以查看是否一切正常。问题是我无法收到该消息,我无法理解正在发生的事情。 P.S:我真的不明白你在第 3 点想说什么。 谢谢

【问题讨论】:

    标签: c# rabbitmq integration-testing masstransit


    【解决方案1】:
    1. 您需要通过调用bus.Start() 启动总线。你不这样做,所以无论如何都不会收到任何东西。
    2. 尚不清楚transportClient.CreateTransportAsync 的作用。谁在发布消息?
    3. 消费者被实例化每个消费的消息。您在“测试”中所做的 - 您实例化消费者的一个实例并保持对该实例的引用。然后你在某处发送消息。 MassTransit 会为您的消费者创建一个新实例,您会更新该字段,然后处理该实例。但是您正在检查您最初创建的实例的Result,它从未收到任何消息。永远是null
    4. 将消息从发布者传递给消费者需要时间。您正在尝试在初始化总线后立即检查结果。我敢肯定,即使你修复了 (1)、(2) 和 (3),你也永远不会这么快得到它

    我不确定您到底要测试什么。使用 MassTransit 发布和使用消息适用于所有传输。你可以从 Github 获取任何示例,构建它,运行它并查看它是否正常工作。

    还有许多针对 RabbitMQ 传输的测试,展示了如何创建这样的东西。例如,检查ConsumerBind_Specs.cs 文件。

    此外,如果您想使用一些现有的消费者实例,您可以按照文档Connecting an existing consumer instance 中的说明将此实例连接到总线。使用e.Instance 而不是e.Consumer 将使您的测试工作正确等待Consume 方法完成。但是,这并不是真正流行的方法,因为您确实希望将消费者范围限制为仅处理一个消息。

    【讨论】:

    • 感谢您的回复!我不知道为什么 bus.StartAsync() 会消失,但它就在那里,问题不在于。我想,就像你说的那样,问题是我试图快速访问响应,但我不知道如何等待它。我走了,给你看更多代码,看看你能不能帮到我
    • @EGM 如果您想更新您的问题 - 这样做,而不是为此创建“答案”。本质上,您正在检查一个永远不会更新的对象的属性 - 请参阅第 3 点。我已尝试更详细地解释它。
    猜你喜欢
    • 2022-09-25
    • 2019-10-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多