【问题标题】:Disable Kafka connection in SpringBoot tests在 Spring Boot 测试中禁用 Kafka 连接
【发布时间】:2023-01-23 08:17:40
【问题描述】:

我正在开发一个遵循微服务架构的 springboot 项目,我使用 Kafka 作为事件总线在它们之间交换数据。我还有 Junit 测试,它测试我的应用程序的某些部分,这些部分不需要总线,而其他部分则需要使用嵌入式 Kafka 代理。

我遇到的问题是,当我启动所有测试时,它们花费了太多时间并且失败了,因为每个测试都试图连接到嵌入式 Kafka 代理(连接不可用) 而他们不需要 Kafka 总线来完成他们的任务。

是否可以为这些测试禁用 Kafka 组件的加载,而只允许它们用于需要它的组件?

【问题讨论】:

    标签: java spring-boot apache-kafka spring-kafka


    【解决方案1】:

    这就是我通常编写 JUnit 测试类的方式,通常不会为每个测试连接到 KAFKA Brokers。

    1. 模拟 REST API,如果您的 KAFKA 客户端(生产者/消费者)与 REST API 集成

      公共类 MyMockedRESTAPI {

      public MyMockedRESTAPI() {
      }
      
      public APIResponseWrapper apiResponseWrapper(parameters..) throws RestClientException {
          if (throwException) {
              throw new RestClientException(....);
          }
          return new APIResponseWrapper();
      }
      

      }

      1. 用于生成传入 KAFKA 事件和 REST API 请求和响应包装器的工厂类

      公共类模拟工厂{

      private static final Gson gson = new Gson();
      
      public static KAKFAEvent generateKAFKAEvent() {
          KAKFAEvent kafkaEvent = new KAKFAEvent();
          kafkaEvent.set...
          kafkaEvent.set...
          kafkaEvent.set...
      
          return KAKFAEvent;
      }
      
      
      public static ResponseEntity<APIResponse> createAPIResponse() {
          APIResponse response = new APIResponse();
          return new ResponseEntity<>(response, HttpStatus.OK);
      }
      

      }

      1. 测试运行器类

        @RunWith(SpringJUnit4ClassRunner.class)

        公共课 KAFKAJUnitTest {

        你的断言应该在这里声明

        }

      也可以参考:https://www.baeldung.com/spring-boot-kafka-testing

    【讨论】:

      【解决方案2】:

      一个好的做法是在隔离的微服务范围内测试代码时避免向 Kafka 发送消息。但是当您需要进行集成测试(同时进行许多微服务)时,有时您需要激活 Kafka 消息。

      所以我的目的是:

      1- 根据需要激活/停用 Loding Kafka 配置

      @ConditionalOnProperty(prefix = "my.kafka.consumer", value = "enabled", havingValue = "true", matchIfMissing = false)
      @Configuration
      public class KafkaConsumerConfiguration {
      ...
      }
      
      @ConditionalOnProperty(prefix = "my.kafka.producer", value = "enabled", havingValue = "true", matchIfMissing = false)
      @Configuration
      public class KafkaProducerConfiguration {
      ...
      }
      

      然后你将能够根据需要激活/停用加载消费者和生产者......

      例子 :

      @SpringBootApplication
      @Import(KafkaConsumerConfiguration.class)
      public class MyMicroservice_1 {
          public static void main(String[] args) {
              SpringApplication.run(MyMicroservice_1.class, args);
          }
      }
      

      或者

      @SpringBootApplication
      @Import(KafkaProducerConfiguration.class)
      public class MyMicroservice_2 {
          public static void main(String[] args) {
              SpringApplication.run(MyMicroservice_2.class, args);
          }
      }
      

      或者可能需要两种配置的微服务

      @SpringBootApplication
      @Import(value = { KafkaProducerConfiguration.class, KafkaConsumerConfiguration.class })
      public class MyMicroservice_3 {
          public static void main(String[] args) {
              SpringApplication.run(MyMicroservice_3.class, args);
          }
      }
      

      2 - 您还需要根据当前的弹簧配置文件发送消息。为此,您可以覆盖 Kafka 模板对象的发送方法:

      @ConditionalOnProperty(prefix = "my.kafka.producer", value = "enabled", havingValue = "true", matchIfMissing = false)
      @Configuration
      public class KafkaProducerConfiguration {
      ...
      
          @Resource
          Environment environment;
      
          @Bean
          public KafkaTemplate<String, String> kafkaTemplate() {
              return new KafkaTemplate<>(producerFactory()) {
      
                  @Override
                  protected ListenableFuture<SendResult<String, String>> doSend(ProducerRecord<String, String> producerRecord) {
      
                      if (Arrays.asList(environment.getActiveProfiles()).contains("test")) {
                              return null;            
                      }
      
                      return super.doSend(producerRecord);
                  }
      
              };
          }
          
              @Bean
          public ProducerFactory<String, String> producerFactory() {
              Map<String, Object> props = new HashMap<>();
              ...
      
              return new DefaultKafkaProducerFactory<>(props);
          }
          
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-07-21
        • 1970-01-01
        • 2018-02-08
        • 2020-02-15
        • 2019-08-25
        • 1970-01-01
        • 1970-01-01
        • 2016-01-02
        相关资源
        最近更新 更多