【问题标题】:Integration testing with event sourcing systems与事件溯源系统的集成测试
【发布时间】:2019-05-09 20:44:21
【问题描述】:

我正在开发一个 PoC,我们将 CQRS 与事件溯源结合使用。我们使用 Axon 框架和 Axon 服务器作为工具集。

我们有一些带有一些业务逻辑的微服务(Maven 包)。

应用流程的简单概述:

我们向服务 1 发布一条 xml 消息(使用 REST),这将导致一个事件(使用聚合)。 服务 2 处理由服务 1“触发”的事件并启动一个 saga 流。例如,Sage 流程的一部分是发送邮件消息。

我可以使用 Axon Test 进行一些测试,以测试来自服务 1 的聚合或来自服务 2 的 saga。但是是否有一个很好的选择来进行真正的集成测试,我们首先将消息发布到 REST 接口并检查聚合和saga中的所有操作(包括发送邮件等)

也许这种集成测试有点过头了,最好单独测试每个组件。我怀疑测试这种类型的系统需要什么/最好的解决方案。

【问题讨论】:

    标签: testing integration-testing event-sourcing axon


    【解决方案1】:

    我建议看看 testcontainers (https://www.testcontainers.org/)

    它提供了一种非常方便的方式来在 JUnit 测试中启动和干净地拆除 docker 容器。此功能对于针对真实数据库和任何其他可用 docker 映像 (https://hub.docker.com/r/axoniq/axonserver/) 的资源(例如 Axon 服务器)对应用程序进行集成测试非常有用。

    我正在分享一些来自 JUnit 4 测试类 (Kotlin) 的代码 sn-ps。希望这可以帮助您开始并发展您的特定测试策略(集成应该覆盖更小的范围,然后是端到端测试)。我的观点是集成测试应该分别/独立地关注 Axon 消息传递 API 组件和 REST API 组件。端到端应涵盖微服务中的所有组件。

    @RunWith(SpringRunner::class)
    @SpringBootTest
    @ContextConfiguration(initializers = [DrestaurantCourierCommandMicroServiceIT.Initializer::class])
    internal class DrestaurantCourierCommandMicroServiceIT {
    
        @Autowired
        lateinit var eventStore: EventStore
    
        @Autowired
        lateinit var commandGateway: CommandGateway
    
        companion object {
    
            // An Axon Server container
            @ClassRule
            @JvmField
            var axonServerTestContainer = KGenericContainer(
                    "axoniq/axonserver")
                    .withExposedPorts(8024, 8124)
                    .waitingFor(Wait.forHttp("/actuator/info").forPort(8024))
                    .withStartupTimeout(Duration.of(60L, ChronoUnit.SECONDS))
    
            // A PostgreSQL container is being started up using a JUnit Class Rule which gets triggered before any of the tests are run:
    
            @ClassRule
            @JvmField
            var postgreSQLContainer = KPostgreSQLContainer(
                    "postgres:latest")
                    .withDatabaseName("drestaurant")
                    .withUsername("demouser")
                    .withPassword("thepassword")
                    .withStartupTimeout(Duration.of(60L, ChronoUnit.SECONDS))
        }
    
        // Pass details on the application as properties BEFORE Spring starts creating a test context for the test to run in:
        class Initializer : ApplicationContextInitializer<ConfigurableApplicationContext> {
    
            override fun initialize(configurableApplicationContext: ConfigurableApplicationContext) {
                val values = TestPropertyValues.of(
                        "spring.datasource.url=" + postgreSQLContainer.jdbcUrl,
                        "spring.datasource.username=" + postgreSQLContainer.username,
                        "spring.datasource.password=" + postgreSQLContainer.password,
                        "spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect",
                        "axon.axonserver.servers=" + axonServerTestContainer.containerIpAddress + ":" + axonServerTestContainer.getMappedPort(8124)
                )
                values.applyTo(configurableApplicationContext)
            }
        }
    
    
        @Test
        fun `restaurant command microservice integration test - happy scenario`() {
    
            val who = "johndoe"
            val auditEntry = AuditEntry(who, Calendar.getInstance().time)
            val maxNumberOfActiveOrders = 5
            val name = PersonName("Ivan", "Dugalic")
            val orderId = CourierOrderId("orderId")
    
            // ******* Sending the `createCourierCommand` ***********
            val createCourierCommand = CreateCourierCommand(name, maxNumberOfActiveOrders, auditEntry)
            commandGateway.sendAndWait<Any>(createCourierCommand)
            await withPollInterval org.awaitility.Duration.ONE_SECOND atMost org.awaitility.Duration.FIVE_SECONDS untilAsserted {
                val latestCourierCreatedEvent = eventStore.readEvents(createCourierCommand.targetAggregateIdentifier.identifier).asStream().toList().last().payload as CourierCreatedEvent
                assertThat(latestCourierCreatedEvent.name).isEqualTo(createCourierCommand.name)
                assertThat(latestCourierCreatedEvent.auditEntry.who).isEqualTo(createCourierCommand.auditEntry.who)
                assertThat(latestCourierCreatedEvent.maxNumberOfActiveOrders).isEqualTo(createCourierCommand.maxNumberOfActiveOrders)
            }
    
            // ******* Sending the `createCourierOrderCommand` **********
            val createCourierOrderCommand = CreateCourierOrderCommand(orderId, auditEntry)
            commandGateway.sendAndWait<Any>(createCourierOrderCommand)
            await withPollInterval org.awaitility.Duration.ONE_SECOND atMost org.awaitility.Duration.FIVE_SECONDS untilAsserted {
                val latestCourierOrderCreatedEvent = eventStore.readEvents(createCourierOrderCommand.targetAggregateIdentifier.identifier).asStream().toList().last().payload as CourierOrderCreatedEvent
                assertThat(latestCourierOrderCreatedEvent.aggregateIdentifier.identifier).isEqualTo(createCourierOrderCommand.targetAggregateIdentifier.identifier)
                assertThat(latestCourierOrderCreatedEvent.auditEntry.who).isEqualTo(createCourierOrderCommand.auditEntry.who)
            }
    
            // ******* Assign the courier order to courier **********
            val assignCourierOrderToCourierCommand = AssignCourierOrderToCourierCommand(orderId, createCourierCommand.targetAggregateIdentifier, auditEntry)
            commandGateway.sendAndWait<Any>(assignCourierOrderToCourierCommand)
            await withPollInterval org.awaitility.Duration.ONE_SECOND atMost org.awaitility.Duration.FIVE_SECONDS untilAsserted {
                val latestCourierOrderAssignedEvent = eventStore.readEvents(assignCourierOrderToCourierCommand.targetAggregateIdentifier.identifier).asStream().toList().last().payload as CourierOrderAssignedEvent
                assertThat(latestCourierOrderAssignedEvent.aggregateIdentifier.identifier).isEqualTo(assignCourierOrderToCourierCommand.targetAggregateIdentifier.identifier)
                assertThat(latestCourierOrderAssignedEvent.auditEntry.who).isEqualTo(assignCourierOrderToCourierCommand.auditEntry.who)
                assertThat(latestCourierOrderAssignedEvent.courierId.identifier).isEqualTo(assignCourierOrderToCourierCommand.courierId.identifier)
            }
    
        }
    }
    
    class KGenericContainer(imageName: String) : GenericContainer<KGenericContainer>(imageName)
    class KPostgreSQLContainer(imageName: String) : PostgreSQLContainer<KPostgreSQLContainer>(imageName)
    

    【讨论】:

    • 这个项目的完整代码可以在 github 上找到吗?
    【解决方案2】:

    Axon 开发人员建议使用here 中提到的 docker 解决方案。 Testcontainers 似乎是这里最好的。 我的 java sn-p:

    @ActiveProfiles("test")
    public class TestContainers {
    
        private static final int AXON_HTTP_PORT = 8024;
        private static final int AXON_GRPC_PORT = 8124;
    
        public static void startAxonServer() {
            GenericContainer axonServer = new GenericContainer("axoniq/axonserver:latest")
                    .withExposedPorts(AXON_HTTP_PORT, AXON_GRPC_PORT)
                    .waitingFor(
                            Wait.forLogMessage(".*Started AxonServer.*", 1)
                    );
            axonServer.start();
    
            System.setProperty("ENV_AXON_GRPC_PORT", String.valueOf(axonServer.getMappedPort(AXON_GRPC_PORT)));
        }
    

    在您的@BeforeClass 中调用startAxonServer 方法。现在您必须获取外部 docker 端口(@98​​7654326@ 中指出的这些端口是 docker-internal)。 您可以通过getMappedPort 在运行时执行此操作,如我的 sn-p 所示。请记住为您的测试套件提供连接配置。我在 Spring Boot 上的示例如下:

    axon:
      axonserver:
        servers: localhost:${ENV_AXON_GRPC_PORT}
    

    可以在我的github project 上找到完整的工作解决方案。

    【讨论】:

      猜你喜欢
      • 2021-10-01
      • 2012-08-06
      • 2016-07-25
      • 2022-09-24
      • 2017-12-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多