【问题标题】:How to write unit test case for application using Kafka Streams如何使用 Kafka Streams 为应用程序编写单元测试用例
【发布时间】:2021-08-21 14:52:06
【问题描述】:

我们正在构建一个将使用 Kafka Streams 的应用程序。我们正在寻找示例示例,该示例向我们展示了如何为场景编写测试用例,其中我们从 Kafka 拓扑中调用外部服务。 基本上需要以某种方式模拟外部调用,因为服务可能不会一直运行。我们正在使用 TopologyTestDriver 编写测试用例。由于这个外部调用,我们的测试用例没有执行。出现错误:org.springframework.web.client.ResourceAccessException:对“http://localhost:8080/endPointName”的 POST 请求出现 I/O 错误:连接被拒绝:连接;嵌套异常是 java.net.ConnectException: Connection denied: connect

我们要为其编写测试用例的示例代码:

@Bean
public RestTemplate restTemplate() {
    return new RestTemplate();
}

public void method(StreamsBuilder builder) {
    builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
        .peek((s, customObj) -> LOG.info(customObj))
        .mapValues(this::getResult)
        .peek((s, result) -> LOG.info(result))
        .to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
}

private Result getResult(Custom customObj) {
    HttpHeaders headers = new HttpHeaders();
    headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
    HttpEntity<Custom> request = new HttpEntity<>(customObj, headers);
    return restTemplate.postForEntity(restCompleteUri, request, Result.class).getBody();
}

示例测试用例示例:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
public class TopologyTest {
    private static TopologyTestDriver topologyTestDriver;
    private static final Logger LOG = LogManager.getLogger();

@Autowired
private ConfigurableApplicationContext appContext;

@BeforeAll
void setUp() {
    Properties properties = getProperties();

    StreamsBuilder builder = new StreamsBuilder();
    appContext.getBean(PublisherSubscriberTopology.class).withBuilder(builder);
    Topology topology = builder.build();

    topologyTestDriver = new TopologyTestDriver(topology, properties);
}

private Properties getProperties() {
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
    properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogDeserializationExceptionHandler.class.getName());
    properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class.getName());
    return properties;
}

@Test
void testAppFlow() {
    Custom customObj = getCustomObj();
    Result result = getResult();

    ConsumerRecordFactory<String, Custom> resultFactory =
            new ConsumerRecordFactory<>(inTopic,
                    StreamsSerdes.String().serializer(), StreamsSerdes.CustomSerde().serializer());

    topologyTestDriver.pipeInput(resultFactory.create(
            inTopic,
            "1001",
            customObj
    ));

    ProducerRecord<String, Result> record =
            topologyTestDriver.readOutput(
                    outputTopic,
                    StreamsSerdes.String().deserializer(),
                    StreamsSerdes.ResultSerde().deserializer()
            );

    assertAll(() -> assertEquals("abc123", record.value().getABC()));
}

private Custom getCustomObj() {
    Custom customObj = new Custom();
    //setting customObj
    return customObj;
}

private Result getResult() {
    Result result = new Result();
    //setting resultObj
    return result;
}

@AfterAll
static void tearDown() {
    try {
        topologyTestDriver.close();
    } catch (Exception e) {
        LOG.error(e.getMessage());
    }
}

}

【问题讨论】:

    标签: java spring-mvc apache-kafka apache-kafka-streams


    【解决方案1】:

    在这种特殊情况下,考虑重构现有代码 - 将对 HTTP 的调用抽象到某个接口并模拟它。由于您无论如何都在使用spring,因此注入将与HTTP一起使用的bean,而不是调用

    public void method(StreamsBuilder builder) {
        builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
            .peek((s, customObj) -> LOG.info(customObj))
            .mapValues(this::getResult)
            .peek((s, result) -> LOG.info(result))
            .to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
    }
    
    private Result getResult(Custom customObj) {
       ... HTTP call here ...
    }  
    

    使用这样的东西:

    
    class Whatever {
      @Autowired
      private HttpFacade httpFacade;
    
      public void method(StreamsBuilder builder) {...}
    
      private Result getResult(Custom customObj) {
          // httpFacade is int
          httpFacade.callRemoteService(customObj);
       }
    }
    
    @Component
    class HttpFacade {
       public ABC callRemoteService(CustomObj) {
         ... here comes the code that works with HttpClient
       }
    }
    

    使用此设置,您可以在测试中模拟 HttpFacade(如果您在没有 spring 的情况下运行单元测试,则使用 @MockBean 或普通模拟) 并指定期望。

    这是给你的具体案例。

    一般来说,如果您必须测试 Http 请求是否填充了正确的 URL、标头、正文等。您可以使用 WireMock。

    对于 Kafka Streams,由于它是 Kafka 的客户端库,您可以在测试前启动 Kafka docker Test Container(可能还有 Zookeeper),设置它以创建所需的主题,然后就可以开始了。

    如果您想测试真正的 kafka 交互并且确实想确保消息到达 kafka 主题,然后被您的消费者消费等,这很有意义,换句话说,更复杂的情况。

    如果您使用的是 spring kafka,也可以选择使用 Embedded Kafka,但我不确定它是否适用于 kafka 流 - 您应该尝试一下,但至少它是一个“可行”的方向

    更新(基于 op 的评论):

    在 spring 驱动测试中使用模拟 bean 时,您必须指定对该 bean 的期望:

    
    @SpringBootTest // or whatever configuration you have to run spring driven test here
    public class MyTest {
    
       @MockBean
       private HttpFacade httpFacade;
    
       @Test
       public void myTest() {
         Mockito.when(httpFacade).callRemoteService(eq(<YOUR_EXPECTED_CUSTOM_OBJECT_1>)).thenReturn(<EXPECTED_RESPONSE_1);
    ... its possible to specify as many expectations as you wish...
    
         ... run the test code here that you probably already have...
       }
    }
    
    

    关键是您实际上并不需要进行 HTTP 调用来测试您的 kafka 流代码!

    【讨论】:

    • 您能否分享/告诉修改后的测试用例在这种情况下将如何变成(在我的测试类-TopologyTest 中)。基本上如何在 TopologyTest 的测试用例中使用 Mock 端点?
    • @vivek075 - 我已经更新了答案,请阅读并告诉我它是否适合您的需求
    • 感谢您的回答。问题仍然存在,因为当执行将调用休息端点的拓扑时,瓶颈就在那里。如何触发模拟的休息端点。如果我不进行 http 调用(模拟),那么如何对 kafka 流进行单元测试;作为 rest 端点返回的结果将被推送到输出主题。
    • 这正是模拟的重点。在这种情况下,如果您关心术语,那么准确地说是存根。它用于指定期望。因此,您“模拟”来自远程 HTTP 服务器的答案,这些答案无论如何都超出了您的测试上下文。您无需测试 HTTP 服务器是否正常工作,而是测试 kafka 流代码是否根据存根 HTTP 调用的结果执行其需要执行的操作。
    • 拓扑执行时,该存根将如何注入,它不会执行实际的rest端点吗?如果你能告诉我我们如何修改我的@Test 案例以利用那个模拟端点?
    【解决方案2】:

    该问题与 Kafka Streams 无关。您依赖于拓扑中间的 HTTP 客户端(顺便说一下,不推荐),因此您需要询问如何测试它。

    您需要注入一个模拟的RestTemplaterestCompleteUri 变量来与一些假HTTP 端点进行通信。

    例如,WireMock 或查看这些帖子

    【讨论】:

    • 你能告诉我推荐的方法是什么。基本上它是一个无状态的 Kafka 流;所以我们不能在 KTable 中维护任何东西。我们调用的其余端点只是一个转换器,它将输入转换为输出。我也知道我们需要模拟休息端点,但如何嵌入到 kafka 拓扑中。
    • TopologyTestDriver 应该为您处理,并且与 Rest 客户端无关,或者是无状态/有状态,正如我已经回答的那样。如果它“只是一个转换”,那么你应该在本地 JVM 中引入必要的代码来完全做到这一点,而不是使用外部 HTTP 端点
    • TopologyTestDriver 应该为您处理,并且与 Rest 客户端无关 - 如何??
    • StreamsBuilder 和 RestTemplate 是相互独立实例化的独立对象。你唯一的要求是两者都是非空的,但两者都可以被模拟/伪造
    • 那么我应该假设它目前无法实现,还是 Kafka Streams 没有提供足够的功能来为这些场景编写单元测试用例?我对 Kafka Streams 很陌生。因此提出这些问题。如果您发现我的理解不正确,我们深表歉意。如果可行,请纠正我?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-19
    相关资源
    最近更新 更多