【发布时间】:2025-12-24 17:35:11
【问题描述】:
在一个春季项目中,我使用了 Kafka,现在我想创建一个以“TopicName”和“GroupeId”为参数的方法 并计算“主题分区的Lastoffsets”与“组消耗的offsets”之间的差异
对于 lastOffsets 我明白了 现在我需要获取消耗的偏移量来计算差异
public ResponseEntity<Offsets> deltaoffsets (@RequestParam( name = "groupId") String groupId, @RequestParam( name = "topic") String topic) {
Map<String,Object> properties = (Map) kafkaLocalConsumerConfig.get("kafkaLocalConsumerConfig");
properties.put("group.id", groupId);
properties.put("enable.auto.commit", "true");
List<TopicPartition> partition=new ArrayList<>();
KafkaConsumer<String, RefentialToReload> kafkaLocalConsumer = new KafkaConsumer<>(properties);
Map<String, List<PartitionInfo>> topics = kafkaLocalConsumer.listTopics();
List<PartitionInfo> partitionInfos = topics.get(topic);
if (partitionInfos == null) {
log.warn("Partition information was not found for topic");
}
else {
for (PartitionInfo partitionInfo : partitionInfos) {
TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
partition.add(topicPartition);
log.info("partition assigned to kafkaLocalConsumer");
}
}
//get lastOffsets of the topicPartition
Map<TopicPartition,Long> OffsetsTopicpartition = kafkaLocalConsumer.endOffsets(kafkaLocalConsumer.assignment());
//here i need to get consumed offsets
}
【问题讨论】:
-
你想看看还有多少条消息要读吗?
标签: java spring-boot apache-kafka spring-kafka