【问题标题】:Consumer not getting called when unit testing MassTransit单元测试 MassTransit 时未调用消费者
【发布时间】:2019-05-14 15:27:19
【问题描述】:

我正在尝试使用 MassTransit 编写单元测试。在线查看时,我发现访问总线的最佳方法是使用InMemoryTestHarness 创建它。我添加了我的消费者和 PublishObserver 以获得结果行为。

在下面的示例中,我将TestRequest 消息发送到总线,然后我的消费者读取请求并将TestResponse 消息放回总线上。最后,观察者得到响应。

我不知道问题是否与我缺少的某些配置有关,或者是否有一些我没有等待的任务,但请求消息甚至从未到达消费者。

我错过了什么?

测试

[TestMethod]
public void RequestResponseBusTest()
{
    var harness = new InMemoryTestHarness();

    var consumer = new TestConsumer();
    harness.OnConfigureInMemoryBus += c =>
    {
        c.ReceiveEndpoint("testqueue", e =>
            e.Consumer(() => consumer));
    };

    var observer = new TestPublishObserver();
    harness.OnConnectObservers += c =>
    {
        c.ConnectPublishObserver(observer);
    };

    harness.Start().Wait();
    var bus = harness.Bus;

    bus.Publish(new TestRequest() { X = 99 }).Wait();

    Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
    Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
    Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");

}

和支持类

[Serializable]
public class TestRequest
{
    public int X { get; set; }
}

[Serializable]
public class TestResponse
{
    public int Y { get; set; }
}

public class TestConsumer : IConsumer<TestRequest>
{
    public List<TestRequest> ConsumedMessages { get; } = new List<TestRequest>();

    public Task Consume(ConsumeContext<TestRequest> context)
    {
        ConsumedMessages.Add(context.Message);
        context.Publish(new TestResponse() { Y = 123 }).Wait();
        return Task.CompletedTask;
    }
}

private class TestPublishObserver : IPublishObserver
{
    public List<TestRequest> PublishedRequests { get; } = new List<TestRequest>();
    public List<TestResponse> PublishedResponses { get; } = new List<TestResponse>();

    public Task PrePublish<T>(PublishContext<T> context) where T : class
    {
        return Task.CompletedTask;
    }

    public Task PostPublish<T>(PublishContext<T> context) where T : class
    {
        var msg = context.Message;

        if (msg is TestRequest)
            PublishedRequests.Add((TestRequest)(object)msg);

        if (msg is TestResponse)
            PublishedResponses.Add((TestResponse)(object)msg);

        return Task.CompletedTask;
    }

    public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
    {
        return Task.CompletedTask;
    }
}

【问题讨论】:

  • 我认为了解消费者安全带的使用方式会有所帮助:github.com/MassTransit/MassTransit/blob/develop/src/…
  • 我知道我没有按照它的本意使用它。实际上,我正在做的是对 EventFlow 应用程序的一些端到端测试,我在 IBusControl 上放置一条消息并观察读取模型的状态。为此,我必须等待所有消费者在我开始评估之前结束,我没有找到简单的方法来做到这一点。
  • 您可以使用活动监视器:stackoverflow.com/a/56044328/1882
  • 我会调查的,谢谢。

标签: c# unit-testing masstransit


【解决方案1】:

你需要在 bus.Publish(new TestRequest() { X = 99 }).Wait(); 之后添加 Thread.Sleep(2000) bus.Publish 不保证消息传递。当你调用 Wait() 方法时,你只是等待它被发送,而不是被处理

或者!!!

[TestMethod]
public void RequestResponseBusTest()
{
    var harness = new InMemoryTestHarness();

    var consumer = new TestConsumer();
    harness.OnConfigureInMemoryBus += c =>
    {
        c.ReceiveEndpoint("testqueue", e =>
            e.Consumer(() => consumer));
    };

    var observer = new TestPublishObserver();
    harness.OnConnectObservers += c =>
    {
        c.ConnectPublishObserver(observer);
    };

    harness.Start().Wait();
    var bus = harness.Bus;

    bus.Publish(new TestRequest() { X = 99 }).Wait();

    //add this line
    var receivedMessage = harness.Consumed.Select<TestRequest>().FirstOrDefault();

    Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
    Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
    Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");

}

【讨论】:

  • 我认为您提供的第二种方法可以解决我的问题。我必须进行一些实验,但我认为我可以在我的测试中编写一个通用的“等待所有已发布的消息”方法。谢谢!
猜你喜欢
  • 1970-01-01
  • 2019-09-02
  • 1970-01-01
  • 1970-01-01
  • 2022-01-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多