【问题标题】:Kafka exactly once messaging test with "consume-transform-produce" Integration testKafka 使用“consume-transform-produce”进行消息传递测试集成测试
【发布时间】:2020-12-15 17:50:27
【问题描述】:

我正在编写测试用例来测试我的应用程序的 Kafka 消费-转换-生产循环。如此有效地,我从 sourceTopic-processing-sendMessage 消费到 Destination 主题。我正在编写这些测试用例来证明与 Kafka 的一次性消息传递,因为稍后我将添加其他失败案例。
这是我的配置:

private Map<String, Object> consConfigProps(boolean txnEnabled) {
    Map<String, Object> props = new HashMap<>(
            KafkaTestUtils.consumerProps(AB_CONSUMER_GROUP_ID, "false", kafkaBroker));
    props.put(ConsumerConfig.GROUP_ID_CONFIG, AB_CONSUMER_GROUP_ID);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return props;
}

private Map<String, Object> prodConfigProps(boolean txnEnabled) {
    Map<String, Object> props = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID().toString());
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
    props.put(ProducerConfig.RETRIES_CONFIG, "3");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                "prod-txn-" + UUID.randomUUID().toString());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}

public KafkaMessageListenerContainer<String, NormalUser> fetchContainer() {
    ContainerProperties containerProperties = new ContainerProperties(ABTOPIC, XYTOPIC, PATOPIC);
    containerProperties.setGroupId("groupId-10001");
    containerProperties.setAckMode(AckMode.MANUAL);
    containerProperties.setSyncCommits(true);
    containerProperties.setSyncCommitTimeout(Duration.ofMillis(5000));
    containerProperties.setTransactionManager(kafkaTransactionManager());
    KafkaMessageListenerContainer<String, NormalUser> kafkaMessageListContainer = new KafkaMessageListenerContainer<>(
            consumerFactory(), containerProperties);
    kafkaMessageListContainer.setupMessageListener(new AcknowledgingMessageListener<String, NormalUser>() {
        @Override
        public void onMessage(ConsumerRecord<String, NormalUser> record, Acknowledgment acknowledgment) {
            log.debug("test-listener received message='{}'", record.toString());
            records.add(record);
            acknowledgment.acknowledge();
        }
    });
    return kafkaMessageListContainer;
}

    @Test
    public void testProducerABSuccess() throws InterruptedException, IOException {
        NormalUser userObj = new NormalUser(ABTypeGood,
                Double.valueOf(Math.random() * 10000).longValue(),
                "Blah" + String.valueOf(Math.random() * 10));
        sendMessage(XYTOPIC, "AB-id", userObj);
        try {
            ConsumerRecords<String, NormalUser> records;
            parserConsumer.subscribe(Collections.singletonList(XYTOPIC));
            Map<TopicPartition, OffsetAndMetadata> currentOffsets = new LinkedHashMap<>();
            // Check for messages
            parserProducer.beginTransaction();
            records = parserConsumer.poll(Duration.ofSeconds(3));
            assertThat(1).isEqualTo(records.count()); // --> this asserts passes like 50% of the time. 
            for (ConsumerRecord<String, NormalUser> record : records) {
                assertEquals(record.key(), "AB-id");
                assertEquals(record.value(), userObj);
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset()));
            }
            parserProducer.send(new ProducerRecord<String, NormalUser>(ABTOPIC, "AB-id", userObj));
            parserProducer.sendOffsetsToTransaction(currentOffsets, AB_CONSUMER_GROUP_ID);
            parserProducer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            parserProducer.close();
        } catch (final KafkaException e) {
            parserProducer.abortTransaction();
        }
        ConsumerRecords<String, NormalUser> records;
        loadConsumer.subscribe(Collections.singletonList(ABTOPIC));
        records = loadConsumer.poll(Duration.ofSeconds(3));
        assertThat(1).isEqualTo(records.count()); //--> this assert fails all the time. 
        for (ConsumerRecord<String, NormalUser> record : records) {
            assertEquals(record.key(), "AB-id");
            assertEquals(record.value(), userObj);
        }
    }

我的问题是上述测试用例“testProducerABSuccess”不一致,断言有时会失败,有时会通过。我一直无法弄清楚为什么它们如此不一致。以上有什么问题。

编辑:16-12:

  1. 使用 consumerconfig.Auto_Offset_Reset_config-earliest 测试,没有变化。第一个断言通过了大约 70% 的时间。第二个断言始终失败(0% 通过率)。

【问题讨论】:

    标签: spring-kafka spring-kafka-test


    【解决方案1】:

    哪个断言失败了?如果是assertThat(1).isEqualTo(records.count());,可能是因为您将auto.offset.reset 设置为latest。它必须是earliest 以避免竞争条件,即在为消费者分配分区之前发送记录。

    【讨论】:

    • 我使用“consumerconfig.Auto_Offset_Reset_config=earliest”进行了测试。现在第一个断言通过了大约 70% 的时间,或多或少与之前相同。第二个断言始终失败(0% 通过率)。第二个断言失败日志:org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatResponseHandler: [Consumer clientId=consumer-ebs-10001-1, groupId=ebs-10001] 由于组正在重新平衡,因此尝试检测信号失败。这将永远持续下去。
    • 你应该把消费者分成不同的组;您还应该关闭第一个消费者。另外,请考虑使用assign() 而不是subscribe()
    • 谢谢加里,你让我开心。解决方案做了以下。为每个消费者创建单独的组。 2. producer.sendOffsetsToTransaction 映射到同一个消费者组, 3. 分配给消费者的分区,生产者也使用分区来发送消息以更好地控制断言。测试用例正在通过。干杯。
    猜你喜欢
    • 1970-01-01
    • 2019-07-12
    • 2018-07-06
    • 2012-01-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-09
    • 1970-01-01
    相关资源
    最近更新 更多