【问题标题】:Spring Kafka Test - Not receiving data in @KafkaListener with EmbeddedKafkaSpring Kafka 测试 - 没有在 @KafkaListener 中使用 EmbeddedKafka 接收数据
【发布时间】:2020-06-23 15:37:59
【问题描述】:

我们正在使用 Cucumber 对 out 应用程序进行一些集成测试,并且在测试 @KafkaListener 时遇到了一些问题。我们设法使用 EmbeddedKafka 并在其中生成数据。

但消费者从未收到任何数据,我们也不知道发生了什么。

这是我们的代码:

生产者配置

@Configuration
@Profile("test")
public class KafkaTestProducerConfig {

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

消费者配置

@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig {

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
        return props;
    }

    @Bean
    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
        KafkaAvroDeserializer avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(consumerProperties(), false);
        return new DefaultKafkaConsumerFactory<>(consumerProperties(), new StringDeserializer(), avroDeserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

}

集成测试

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
        classes = Application.class)
@ActiveProfiles("test")
@EmbeddedKafka(topics = {"TOPIC1", "TOPIC2", "TOPIC3"})
public class CommonStepDefinitions implements En {

    protected static final Logger LOGGER = LoggerFactory.getLogger(CommonStepDefinitions.class);

    @Autowired
    protected KafkaTemplate<String, GenericRecord> kafkaTemplate;

}

步骤定义

public class KafkaStepDefinitions extends CommonStepDefinitions {

    private static final String TEMPLATE_TOPIC = "TOPIC1";

    public KafkaStepDefinitions(){
        Given("given statement", () -> {
            OperationEntity operationEntity = new OperationEntity();
            operationEntity.setFoo("foo");
            kafkaTemplate.send(TEMPLATE_TOPIC, AvroPojoTransformer.pojoToRecord(operationEntity));
        });
    }

}

消费者 同样的代码在生产 Bootstrap 服务器上运行良好,但嵌入式 Kafka 从未达到过

@KafkaListener(topics = "${kafka.topic1}", groupId = "groupId")
    public void consume(List<GenericRecord> records, Acknowledgment ack) throws DDCException {
        LOGGER.info("Batch of {} records received", records.size());
        //do something with the data
        ack.acknowledge();
    }

日志中的一切看起来都很好,但我们不知道缺少什么。

提前致谢。

【问题讨论】:

  • 对我来说一切都很好;你确定记录是公开的吗?你能把日志贴在某个地方(pastebin 等)吗?
  • 日志中唯一看起来不好的是这个错误:删除时出错 ...\AppData\Local\Temp\kafka-7678436650062051819\TOPIC1-0\00000000000000000000.index:进程无法访问该文件,因为它正被另一个进程使用
  • 是的 - 这是关闭期间 Windows 上的一个(尚未解决的)问题。如果您收到partitions assigned INFO 日志,我几乎可以保证问题出在发布方面。再次,如果您可以发布日志,我可以看看。
  • 这是集成测试的完整日志:pastebin.com/FjYmJ2TW

标签: java spring-boot apache-kafka spring-kafka spring-kafka-test


【解决方案1】:

问题是消费者没有连接到嵌入式 Kafka。为此,您可以使用 test 配置文件运行测试并将以下内容添加到 application-test.yml

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}

那么您也不需要自定义 consumerPropertiesconsumerFactorykafkaListenerContainerFactory bean。 Spring Boot 将为您自动装配这些。如果您确实希望使用这些 bean(不知道为什么),您应该仔细检查 KafkaAutoConfiguration 以确保您覆盖了正确的名称类型。

【讨论】:

  • 我试图删除我的bean并使用应用程序测试来配置它,仍然没有
  • 我对 Cucumber 不熟悉;你的@SpringBootTest 类中没有@Test 方法。无论如何,您的测试在开始之前就结束了;查看包含0-0-C-1的线程名称;消费者在启动后不到一秒就停止了。
  • 我刚刚检查了一下,没有,我的测试正在执行,因为您可以在日志的第 1174 行看到 ProducerConfig 值的日志。该日志出现在kafkaTemplate.send(topic, entity) 之后。我不使用@Test,因为在 Cucumber 中有 stepDefinitions。您可以在我的帖子中看到代码。
  • 首先尝试将其编写为 JUnit 测试可能会很好。消除一些复杂性,让更多的受众了解问题。
【解决方案2】:

您的测试在开始之前就结束了;查看包含 0-0-C-1 的线程名称;消费者在启动后不到一秒就停止了。

我刚刚检查了一下,没有,我的测试正在执行,因为您可以在日志的第 1174 行看到 ProducerConfig 值的日志。并且该日志出现在 kafkaTemplate.send(topic, entity) 之后。我不使用@Test,因为在 Cucumber 中你有 stepDefinitions。您可以在我的帖子中看到代码。

好的;但是您需要在测试中使用某种闩锁来等待消费者实际获得分配的主题/分区并接收数据。您现在构建测试的方式是在消费者完全启动之前关闭测试。请参阅我对this question 的回答,了解一种包装侦听器的方法,以便您可以等到收到记录。 (这使用普通的 JUnit 测试)。

另一种技术是以某种方式将服务注入到您的侦听器 bean 中,该 bean 对闩锁进行倒计时。

作为快速测试,将Thread.sleep(10_000) 添加到您的“步骤”中。

但是,您大概会想以某种方式断言消费者确实获得了数据。您需要在测试退出之前进行该断言,并且因为它是异步的,所以您需要一些机制来等待它发生(或超时)。

【讨论】:

    【解决方案3】:

    EmbeddedKafka 也面临同样的问题 尝试使用 KafkaContainer

    @ActiveProfiles({"test"})
    @RunWith(Cucumber.class)
    @CucumberOptions(features= {"src/test/resources/cucumber/data.feature"},
            plugin = {"pretty", "json:target/cucumber.json"})
    @SpringBootTest(classes = MyApplication.class)
    public final class MyApplicationCucumberTest {
        private MyApplicationCucumberTest() {}
    
        @Container
        private static KafkaContainer kafkaContainer = new KafkaTestContainer();
    
    
        @BeforeClass
        public static void beforeClass() throws IOException, TTransportException {
    
            kafkaContainer.start();
            System.out.println("Kafka Bootstrap server : " + kafkaContainer.getBootstrapServers());
            System.setProperty("spring.kafka.bootstrap", kafkaContainer.getBootstrapServers());
            System.out.println("Kafka Bootstrap server : " + System.getProperty("spring.kafka.bootstrap"));
            try {
                // Create Topic 
                kafkaContainer.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my.topic");
                
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println(e);
            }
    
        }
    
    
    }
    

    还要在application.yml中添加spring.kafka.consumer.auto-offset-reset:最早

    public class KafkaTestContainer extends KafkaContainer {
        private static final String KAFKA_DOCKER = "confluentinc/cp-kafka:5.4.3";
    
        public KafkaTestContainer() {
            super(DockerImageName.parse(KAFKA_DOCKER));
            
    
        }
    }
    

    【讨论】:

      猜你喜欢
      • 2022-01-05
      • 1970-01-01
      • 2019-01-20
      • 1970-01-01
      • 2019-09-23
      • 2019-08-24
      • 2019-09-16
      • 2021-11-09
      • 1970-01-01
      相关资源
      最近更新 更多