【问题标题】:Spring kafka embedded testingSpring kafka 嵌入式测试
【发布时间】:2018-11-06 20:40:12
【问题描述】:

我正在使用嵌入式 kafka 编写一个 junit 测试用例。我们有一个管道,其中生产者 > 主题 > 消费者 > 工作() > 生产。 我正在使用第三方模式注册表(通过提供虚假 url 来模拟我的测试)和与之相关的特定 serdes。在 kafka 用户组上讨论了这个之后,这样做的方法是使用 一个模拟注册表,用于手动序列化数据并将 byte[] 本身传递给生产者而不是 avro 记录。在这种情况下,我的消费者怎么会失败,因为它需要一个特定的记录有效负载。关于如何解决这个问题的任何想法?

//Listener method


    */
        @KafkaListener(topics = test1,id="tesId1")
        public void onMessage(@Payload Log log, 
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) Long offset) throws Exception
                 {

               }


    // test class
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @DirtiesContext
    @EmbeddedKafka(topics = { "test1" })
    @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
    public class ConsumerTests {

    }

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    只需使用原始KafkaTemplate(无泛型)和字节数组序列化器。

    例如,使用 JSON 和 StringSerializer

    @SpringBootApplication
    public class So53179695Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So53179695Application.class, args);
        }
    
        @Bean
        public RecordMessageConverter converter() {
            return new StringJsonMessageConverter();
        }
    
        @KafkaListener(id = "foo", topics = "foo")
        public void listen(Foo in) {
            System.out.println(in);
        }
    
        public static class Foo {
    
            private String bar;
    
            public Foo() {
                super();
            }
    
            Foo(String bar) {
                this.bar = bar;
            }
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class So53179695ApplicationTests {
    
        @ClassRule
        public static EmbeddedKafkaRule embeddedKafka = 
            new EmbeddedKafkaRule(1, false, "foo");
    
        @BeforeClass
        public static void setup() {
            System.setProperty("spring.kafka.bootstrap-servers",
                embeddedKafka.getEmbeddedKafka().getBrokersAsString());
        }
    
        @Autowired
        public KafkaTemplate<String, Foo> template;
    
        @SuppressWarnings("rawtypes")
        @Autowired
        public KafkaTemplate rawTemplate;
    
        @SuppressWarnings("unchecked")
        @Test
        public void test() throws Exception {
    //      template.send("foo", new Foo("bar"));
            rawTemplate.send("foo", "{\"bar\":\"baz\"}");
            Thread.sleep(10_000);
        }
    
    }
    

    Foo [bar=baz]
    

    请注意,两个模板都指向同一个物理对象 - 由于 java 的类型擦除,在运行时并不重要。

    这假设您仍在消费者端使用 Avro 反序列化器(或本示例中的 JSON)。

    或者你可以在消费者端使用你的模拟反序列化器来创建一个Log

    【讨论】:

    • 在这个测试用例中,我还想模拟一个消费者工厂 bean,它有一组不同的配置指向嵌入式 kafka 服务器。所以我想的是为消费者工厂创建一个模拟 bean测试类和它并将其设置在侦听器容器中。我是 Spring Boot 新手,所以想看看是否有更好的方法?
    • 你不需要模拟工厂;只需使用属性占位符并在测试中覆盖其值。比如使用spring boot时,覆盖spring.kafka.bootstrap-servers;我已经编辑了答案(它为嵌入式代理使用了新的 2.2 名称,但同样适用于旧的 KafkaEmbedded 规则。如果您不使用 Boot,则可以使用相同的技术。
    猜你喜欢
    • 2016-12-03
    • 1970-01-01
    • 2021-03-17
    • 2021-07-15
    • 2020-06-14
    • 1970-01-01
    • 2019-11-11
    • 2018-07-23
    • 2023-03-10
    相关资源
    最近更新 更多