【问题标题】:Spring kafka consumer doesn't respect auto-offset-reset = latestSpring kafka 消费者不尊重 auto-offset-reset = latest
【发布时间】:2019-12-21 18:22:10
【问题描述】:

我们有 1 个 Kafka 主题和 1 个分区:

从 spring boot kafka 消费者那里看到了一个相当奇怪的行为。 Spring kafka 消费者总是在重启时从主题开始消费。 我已经将spring kafka监听器配置如下

kafka 监听器:

@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
    log.debug("SG message received. Parsing...");
    TransmissionMessage transmissionMessage;
    SGTransmission transmission = parseMessage(message);
    //Porcess Transmission......
}

消费者配置和spring消费者容器自动装配bean

@Resource
public Environment env;

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
   // I know this isnt right, should be run in 1 thread as there isonly 
   //partition in the topic
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(3000);

    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

    factory.getContainerProperties().setSyncCommits(true);
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap();
    propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
    return propsMap;
}

spring 应用程序 yaml

kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    properties:
    consumer:
        # If this consumer does not have an offset yet, start at latest offset.
        # Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
        auto-offset-reset: latest
        group-id: ${KAFKA_GROUP_ID}

每次消费者崩溃并重新启动时,都会从头开始读取所有消息。正如您在 application.yaml 中看到的那样,这不应该是这种情况

自动偏移重置:最新

我可能忽略了代理端或消费者端的其他配置,这导致消费者每次重新启动时都从头开始读取??

【问题讨论】:

  • 你能显示显示哪些属性已加载的日志吗?

标签: java spring-boot apache-kafka spring-kafka


【解决方案1】:

您必须以某种方式提交初始偏移量,也许在您完成此配置之前。

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

这意味着您有责任提交偏移量。

使用AckMode.BATCH(默认)或AckMode.RECORD

或者,使用kafka-consumer-groups CLI 工具删除当前提交的偏移量(您也可以使用相同的工具列出当前偏移量)。

或者每次使用组的 UUID 来获取一个新组。

编辑

您还可以让您的侦听器类实现ConsumerSeekAware 并在onPartitionsAssigned() 中调用callback.seekToEnd(partitions)

【讨论】:

  • 奇怪的是......我们在另一个环境中有相同的消费者(另一个 kafka 主题和消费者组 id 但相同的 java 代码)......并且消费者在重启后就好了。
  • 正确 - 这意味着该消费者至少使用earliest 运行过一次。如果您总是想最晚开始,最好使用唯一的group.id
  • 因此消费者组的初始偏移量为“4226”,在应用程序日志中它显示 [Consumer clientId=consumer-10, groupId=group-id] 分区 my-topic-dev 上的偏移量提交失败-0 在偏移量 4226:协调器不知道这个成员。
  • 你是对的;拥有比分区更多的并发性毫无意义。
  • 是的,只要提交了偏移量,auto.offset.reset 就会被忽略。
猜你喜欢
  • 2018-01-27
  • 2017-06-02
  • 1970-01-01
  • 2016-11-11
  • 2019-04-07
  • 2017-12-07
  • 2019-11-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多