【发布时间】:2019-12-21 18:22:10
【问题描述】:
我们有 1 个 Kafka 主题和 1 个分区:
从 spring boot kafka 消费者那里看到了一个相当奇怪的行为。 Spring kafka 消费者总是在重启时从主题开始消费。 我已经将spring kafka监听器配置如下
kafka 监听器:
@KafkaListener(topics = "${application.kafkaInputTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
log.debug("SG message received. Parsing...");
TransmissionMessage transmissionMessage;
SGTransmission transmission = parseMessage(message);
//Porcess Transmission......
}
消费者配置和spring消费者容器自动装配bean
@Resource
public Environment env;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
// I know this isnt right, should be run in 1 thread as there isonly
//partition in the topic
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap();
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, env.getProperty(Constants.SPRING_KAFKA_SECURITY_PROTOCOL));
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty(Constants.SPRING_KAFKA_BOOTSTRAP_SERVERS));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty(Constants.SPRING_KAFKA_GROUP_ID));
return propsMap;
}
spring 应用程序 yaml
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
properties:
consumer:
# If this consumer does not have an offset yet, start at latest offset.
# Be careful with `earliest`, this will use the first (available) offset in the topic, which is most likely not what you want.
auto-offset-reset: latest
group-id: ${KAFKA_GROUP_ID}
每次消费者崩溃并重新启动时,都会从头开始读取所有消息。正如您在 application.yaml 中看到的那样,这不应该是这种情况
自动偏移重置:最新
我可能忽略了代理端或消费者端的其他配置,这导致消费者每次重新启动时都从头开始读取??
【问题讨论】:
-
你能显示显示哪些属性已加载的日志吗?
标签: java spring-boot apache-kafka spring-kafka