【问题标题】:Spring Kafka Embedded - topic already exists between testsSpring Kafka Embedded - 测试之间已经存在主题
【发布时间】:2020-01-24 16:26:05
【问题描述】:

我使用嵌入式 kafka (spring-kafka-test) 创建了一组测试 (JUnit 5),当我有时(并非总是)运行它们时,我得到“主题 'some_name' 已经存在 " 在单次运行中进行一项或多项测试。

所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试类具有 DirtiesContext 注释(AFTER_EACH_TEST_METHOD)。我不知道这个问题的原因是什么,以及如何解决它。

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
    private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
    private final static String SERVER_ADDRES = "127.0.0.1:9092";

    private Consumer<String, String> prepareConsumer() {
        Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
        consumer.subscribe(singleton("some_name"));
        return consumer;
    }

    @Test
    public void someMethodWithKafka1() {
        // some logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }

    @Test
    public void someMethodWithKafka2() {
        // some other logic
        ...
        // check topic content 
        Consumer<String, String> consumer = this.prepareConsumer();
        embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

        ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
        assertThat(records.count()).isEqualTo(1); // and other checks :)

        // clean
        consumer.commitSync();
        consumer.close();
    }
}

【问题讨论】:

  • 您是否只有一个带有 EmbeddedKafka 的测试文件,或者您是否有其他测试类在做同样的事情?
  • 只有一个文件,测试用例很少。

标签: java spring-kafka junit5


【解决方案1】:

您有两个经纪人;您自己创建的:

private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);

还有一个由 Spring 管理:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)

当您将@EmbeddedKafka 与 Spring 测试上下文一起使用时;代理被添加到上下文中。

改成

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

并且不要添加另一个 bean。

通常,为每个测试使用不同的主题更容易(也更快);避免为每个测试创建代理。

编辑

ports = 9092

改用随机端口(省略此配置)并使用

configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());

【讨论】:

  • 问题仍然存在。正如我之前所说,我不想在测试中使用多个主题。
  • 查看我的答案的编辑;您不应为每个实例使用固定端口。但是,请理解,在为每个测试使用新代理时,您的测试将需要更长的时间来运行。
  • 仍然,在从类注释中删除端口并在代码中使用 getBrokersAsString() 设置端口后,我得到相同的错误(随机)。
  • 这毫无意义;如果每个实例都有一个新端口,则该主题不可能已经存在。其他事情正在发生。
  • 你知道,它正在工作,所以我可能在星期五做错了什么。谢谢你。但这仍然只是解决方法问题。
【解决方案2】:

尝试将您的 EmbeddedKafkaBroker 标记为 bean 或使用 @Autowire 从顶级注释创建代理。

因为代理没有被标记为 bean,所以它的生命周期不会由应用程序上下文管理,也不会在 @DirtiesContext 的运行之间被清理。也许它正在坚持使主题跨越测试边界的东西。

【讨论】:

  • 在 Configuration 中将 EmbeddedKafkaBroker 创建为 Bean 并通过 Auwired 连接到测试类后,问题仍然存在。
猜你喜欢
  • 2017-07-05
  • 1970-01-01
  • 2020-06-22
  • 1970-01-01
  • 2015-09-12
  • 1970-01-01
  • 1970-01-01
  • 2016-12-15
  • 1970-01-01
相关资源
最近更新 更多