【问题标题】:Embedded Kafka contract tests randomly failing嵌入式 Kafka 合约测试随机失败
【发布时间】:2022-06-15 16:15:40
【问题描述】:

我正在使用 spring cloud 对比度来测试我的服务的 kafkaListsener。 我将 EmbeddedKafka 与 Spring Cloud 合约一起使用。

我服务中的监听器由 spring kafka 中的@KafkaListener 实现。

我的合同测试如下:

    
@EmbeddedKafka
@SpringBootTest(classes = {ServiceApplication.class},
        properties = {"stubrunner.kafka.enabled=true",
                "stubrunner.stream.enabled=false",
                "spring.cloud.stream.function.autodetect=false"})
public class EventContractTest {

    @Autowired
    private StubTrigger stubTrigger;

    @SpyBean
    @Qualifier("KafkaEventListener")
    private EventListener eventListener;

    @BeforeEach
    public void setup() throws ExecutionException, InterruptedException {   
        Mockito.doNothing().when(eventListener).onEventReceived(any(), any());
    }
    
    @Test
    public void kafkaEventTest() throws ExecutionException, InterruptedException {
        stubTrigger.trigger("kafka-event");

        ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
        ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
        verify(eventListener, timeout(5000).times(1)).
                onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
  ....
    }

   @Test
    public void kafkaEventTest2() throws ExecutionException, InterruptedException {
        stubTrigger.trigger("kafka-event-2");

        ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
        ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
        verify(eventListener, timeout(5000).times(1)).
                onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
    ....
    }
}

在运行测试用例时,大多数情况下它会通过,但它会随机失败,并出现以下异常:

[2022-05-06T09:53:52.883Z] [错误] kafkaEventTest 已用时间:30.177 s

[2022-05-06T09:53:52.883Z] [错误] 测试运行:4,失败:1,错误: 1,跳过:0,经过时间:212.675 秒 , [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z]); [2022-05-06T09:53:52.883Z] 想要 1 次:[2022-05-06T09:53:52.883Z] -> 在 com.messaging.kafka.listener.EventListener.onEventReceived(EventListener.java:49) [2022-05-06T09:53:52.883Z] 但是是 2 次:[2022-05-06T09:53:52.883Z] -> 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 方法)[2022-05-06T09:53:52.883Z] -> 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 方法)[2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z]

从日志中它是由 stubtrigger.trigger() 方法抛出的。这是嵌入式kafka的bug吗?

结果,第一个测试用例由于超时异常而失败。第二个测试用例也失败了,因为监听器方法已经被消费了两次,也就是说第一个用例触发的事件已经被消费了。

【问题讨论】:

  • 我们可以有更多的堆栈跟踪来确定超时的原因吗?
  • @ArtemBilan 我编辑了问题并粘贴了我拥有的整个堆栈跟踪。它只是没有其他有价值的信息。超时异常在第一次合约测试中被抛出并且失败,第二次也因为发现事件被消费了两次而失败
  • 你确定超时异常是触发器未验证的结果吗?
  • 是的。从日志看,是行stubtrigger throw this exception。
  • 存根触发器也可以从其实现中抛出超时异常。可以在类中找到代码:KafkaStubMessages

标签: java spring-boot apache-kafka spring-kafka spring-cloud-contract


【解决方案1】:

为了测试集成,我之前按顺序执行了以下步骤,得到了答案:

第 1 步:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

第 2 步:

@Component
public class KafkaProducer {
  private KafkaTemplate<String,String> kafkaTemplate;
  public void send(String topic,String payload){
    kafkaTemplate.send( topic,payload );
  }
}

第 3 步:

@Component
public class KafkaConsumer {
  String payload;
  @KafkaListener(topics ="test-topic",groupId = "test")
  public void receive(ConsumerRecord<?,?> consumerRecord){
  setPayload(consumerRecord.toString());
  }
  public void setPayload(String p){
    payload=p;
  }
  public String getPayload(){
    return payload;
  }
  
}

第四步:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9094", "port" +
"=9094" })
public class EmbeddedKafkaIntegrationTest {
  @Autowired
  private KafkaConsumer consumer;
  @Autowired
  private KafkaProducer producer;
  private String topic="test-topic";
  @Test
  public void test_integration ()
  throws Exception {
    producer.send(topic, "Sending for test integration");
    Assert.assertTrue(consumer.getPayload().contains( "test-topic" ));
  }
}

【讨论】:

  • 我的情况不同,我正在尝试为我们的应用程序测试合同。我必须使用 stubtrigger 来触发属于另一个应用程序的事件。我在 stubtrigger 旁边没有看到任何提醒
  • 看看@DirtiesContext 能不能帮上忙。没有可重复的样本,很难提供帮助,抱歉
猜你喜欢
  • 1970-01-01
  • 2023-03-10
  • 2020-03-13
  • 2016-01-31
  • 2021-06-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多