【问题标题】:How to test pollling pattern in Mutiny on Quarkus?如何在 Quarkus 上的 Mutiny 中测试轮询模式?
【发布时间】:2021-12-18 11:47:13
【问题描述】:

我想测试一个来自https://smallrye.io/smallrye-mutiny/guides/polling 的简单轮询示例,并将服务的数据轮询到 Kafka 流中。

这是我要测试的类的简化示例:

@ApplicationScoped
public class ExampleScheduler {

    @Inject
    @RestClient
    ExapleService service;


    @PostConstruct
    void init() {
        pollSource();
    }

    @Outgoing("sensor_data_out")
    Multi<String> pollSource() {
        Uni<String> stream = service.getString()
                .runSubscriptionOn(Infrastructure.getDefaultExecutor());

        return stream.repeat().withDelay(Duration.ofSeconds(3))
                .indefinitely();
    }
}

这里是测试类:

@QuarkusTest
class ExampleSchedulerTest {
    
    @Inject
    ExampleScheduler classToTest;

    @InjectMock
    ExampleService mockService;

    @BeforeEach
    void setUp() {
        when(mockService.getString()).thenReturn(Uni.createFrom().item("ANSWER"));
    }

    @Test
    void pollSource() {
        final Multi<String> stream = classToTest.pollSource();
        AssertSubscriber<String> subscriber = stream.subscribe().withSubscriber(AssertSubscriber.create(1));
        subscriber.assertCompleted()
                .assertItems("ANSWER");
    }
}

我的实际示例的错误日志是:

我正在尝试依靠 Quarkus 测试容器来提供 Kafka 的实例

java.lang.AssertionError: No completion (or failure) event received in the last 10000 ms

    at io.smallrye.mutiny.helpers.test.AssertSubscriber.awaitCompletion(AssertSubscriber.java:424)
    at io.smallrye.mutiny.helpers.test.AssertSubscriber.awaitCompletion(AssertSubscriber.java:408)
    at si.src.xanathar.cron.RawSensorDataSchedulerTest.pollThinsBoard(RawSensorDataSchedulerTest.java:86)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at io.quarkus.test.junit.QuarkusTestExtension.runExtensionMethod(QuarkusTestExtension.java:922)
    at io.quarkus.test.junit.QuarkusTestExtension.interceptTestMethod(QuarkusTestExtension.java:752)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

【问题讨论】:

    标签: unit-testing apache-kafka vert.x quarkus mutiny


    【解决方案1】:

    为了测试无限的Multi 流,例如调用UniRepeat#indefinitely 产生的流,您可以收集一组预定义的值,然后取消上游订阅。

    这样的行为可以使用selectfirstcollectlast操作符来实现:

    @QuarkusTest
    class ExampleSchedulerTest {
        
        @Inject
        ExampleScheduler classToTest;
    
        @InjectMock
        ExampleService mockService;
    
        @BeforeEach
        void setUp() {
            when(mockService.getString()).thenReturn(Uni.createFrom().item("ANSWER"));
        }
    
        @Test
        void pollSource() {
            String lastValue = classToTest.pollSource()
                    .select().first(1)
                    .collect().last()
                    .await().atMost(Duration.ofSeconds(4)); // wait for 4 seconds to allow time for the configured upstream delay
            assertEquals("ANSWER", lastValue);
        }
    }
    

    【讨论】:

    • 这也是正确的。虽然我想知道像你展示的终止流是否比从@jzimmerli 展示的断言中省略 assertCompleted() 更有优势。
    • 当然。这具有确定性和可预测性的优势,您可以在其中控制可以执行严格断言的排放窗口。这应该是您的方式,而不是断言在不知道在预定义的时间窗口中发出了多少项目或流是否完成或它是否发出任何随机错误的情况下发出了一个值。作为提示,将withDelay 持续时间更改为5 并检查测试是否仍然成功。
    【解决方案2】:

    如果我没看错,你就断言流已经完成:

    subscriber.assertCompleted()
                .assertItems("ANSWER");
    

    但是您的信息流是无穷无尽的,永远不会结束。因此,调用超时事件。 您只需删除 assertCompleted() 术语。您的测试应该会运行。

    【讨论】:

      猜你喜欢
      • 2022-06-22
      • 2020-09-26
      • 2021-12-17
      • 2021-12-17
      • 2022-06-16
      • 2021-07-05
      • 1970-01-01
      • 1970-01-01
      • 2022-12-01
      相关资源
      最近更新 更多