【发布时间】:2018-11-06 20:40:12
【问题描述】:
我正在使用嵌入式 kafka 编写一个 junit 测试用例。我们有一个管道,其中生产者 > 主题 > 消费者 > 工作() > 生产。 我正在使用第三方模式注册表(通过提供虚假 url 来模拟我的测试)和与之相关的特定 serdes。在 kafka 用户组上讨论了这个之后,这样做的方法是使用 一个模拟注册表,用于手动序列化数据并将 byte[] 本身传递给生产者而不是 avro 记录。在这种情况下,我的消费者怎么会失败,因为它需要一个特定的记录有效负载。关于如何解决这个问题的任何想法?
//Listener method
*/
@KafkaListener(topics = test1,id="tesId1")
public void onMessage(@Payload Log log,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset) throws Exception
{
}
// test class
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "test1" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ConsumerTests {
}
【问题讨论】:
标签: spring-kafka