【发布时间】:2022-06-15 16:15:40
【问题描述】:
我正在使用 spring cloud 对比度来测试我的服务的 kafkaListsener。 我将 EmbeddedKafka 与 Spring Cloud 合约一起使用。
我服务中的监听器由 spring kafka 中的@KafkaListener 实现。
我的合同测试如下:
@EmbeddedKafka
@SpringBootTest(classes = {ServiceApplication.class},
properties = {"stubrunner.kafka.enabled=true",
"stubrunner.stream.enabled=false",
"spring.cloud.stream.function.autodetect=false"})
public class EventContractTest {
@Autowired
private StubTrigger stubTrigger;
@SpyBean
@Qualifier("KafkaEventListener")
private EventListener eventListener;
@BeforeEach
public void setup() throws ExecutionException, InterruptedException {
Mockito.doNothing().when(eventListener).onEventReceived(any(), any());
}
@Test
public void kafkaEventTest() throws ExecutionException, InterruptedException {
stubTrigger.trigger("kafka-event");
ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
verify(eventListener, timeout(5000).times(1)).
onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
....
}
@Test
public void kafkaEventTest2() throws ExecutionException, InterruptedException {
stubTrigger.trigger("kafka-event-2");
ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
verify(eventListener, timeout(5000).times(1)).
onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
....
}
}
在运行测试用例时,大多数情况下它会通过,但它会随机失败,并出现以下异常:
[2022-05-06T09:53:52.883Z] [错误] kafkaEventTest 已用时间:30.177 s
[2022-05-06T09:53:52.883Z] [错误] 测试运行:4,失败:1,错误: 1,跳过:0,经过时间:212.675 秒 , [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z]); [2022-05-06T09:53:52.883Z] 想要 1 次:[2022-05-06T09:53:52.883Z] -> 在 com.messaging.kafka.listener.EventListener.onEventReceived(EventListener.java:49) [2022-05-06T09:53:52.883Z] 但是是 2 次:[2022-05-06T09:53:52.883Z] -> 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 方法)[2022-05-06T09:53:52.883Z] -> 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 方法)[2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z]
从日志中它是由 stubtrigger.trigger() 方法抛出的。这是嵌入式kafka的bug吗?
结果,第一个测试用例由于超时异常而失败。第二个测试用例也失败了,因为监听器方法已经被消费了两次,也就是说第一个用例触发的事件已经被消费了。
【问题讨论】:
-
我们可以有更多的堆栈跟踪来确定超时的原因吗?
-
@ArtemBilan 我编辑了问题并粘贴了我拥有的整个堆栈跟踪。它只是没有其他有价值的信息。超时异常在第一次合约测试中被抛出并且失败,第二次也因为发现事件被消费了两次而失败
-
你确定超时异常是触发器未验证的结果吗?
-
是的。从日志看,是行stubtrigger throw this exception。
-
存根触发器也可以从其实现中抛出超时异常。可以在类中找到代码:KafkaStubMessages
标签: java spring-boot apache-kafka spring-kafka spring-cloud-contract