【发布时间】:2021-08-21 14:52:06
【问题描述】:
我们正在构建一个将使用 Kafka Streams 的应用程序。我们正在寻找示例示例,该示例向我们展示了如何为场景编写测试用例,其中我们从 Kafka 拓扑中调用外部服务。 基本上需要以某种方式模拟外部调用,因为服务可能不会一直运行。我们正在使用 TopologyTestDriver 编写测试用例。由于这个外部调用,我们的测试用例没有执行。出现错误:org.springframework.web.client.ResourceAccessException:对“http://localhost:8080/endPointName”的 POST 请求出现 I/O 错误:连接被拒绝:连接;嵌套异常是 java.net.ConnectException: Connection denied: connect
我们要为其编写测试用例的示例代码:
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
public void method(StreamsBuilder builder) {
builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
.peek((s, customObj) -> LOG.info(customObj))
.mapValues(this::getResult)
.peek((s, result) -> LOG.info(result))
.to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
}
private Result getResult(Custom customObj) {
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
HttpEntity<Custom> request = new HttpEntity<>(customObj, headers);
return restTemplate.postForEntity(restCompleteUri, request, Result.class).getBody();
}
示例测试用例示例:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
public class TopologyTest {
private static TopologyTestDriver topologyTestDriver;
private static final Logger LOG = LogManager.getLogger();
@Autowired
private ConfigurableApplicationContext appContext;
@BeforeAll
void setUp() {
Properties properties = getProperties();
StreamsBuilder builder = new StreamsBuilder();
appContext.getBean(PublisherSubscriberTopology.class).withBuilder(builder);
Topology topology = builder.build();
topologyTestDriver = new TopologyTestDriver(topology, properties);
}
private Properties getProperties() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogDeserializationExceptionHandler.class.getName());
properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class.getName());
return properties;
}
@Test
void testAppFlow() {
Custom customObj = getCustomObj();
Result result = getResult();
ConsumerRecordFactory<String, Custom> resultFactory =
new ConsumerRecordFactory<>(inTopic,
StreamsSerdes.String().serializer(), StreamsSerdes.CustomSerde().serializer());
topologyTestDriver.pipeInput(resultFactory.create(
inTopic,
"1001",
customObj
));
ProducerRecord<String, Result> record =
topologyTestDriver.readOutput(
outputTopic,
StreamsSerdes.String().deserializer(),
StreamsSerdes.ResultSerde().deserializer()
);
assertAll(() -> assertEquals("abc123", record.value().getABC()));
}
private Custom getCustomObj() {
Custom customObj = new Custom();
//setting customObj
return customObj;
}
private Result getResult() {
Result result = new Result();
//setting resultObj
return result;
}
@AfterAll
static void tearDown() {
try {
topologyTestDriver.close();
} catch (Exception e) {
LOG.error(e.getMessage());
}
}
}
【问题讨论】:
标签: java spring-mvc apache-kafka apache-kafka-streams