可以使用ConsumerRebalanceListener接口中的onPartitionsRevoked回调方法。
来自description,
用户可以实现的回调方法,以提供对自定义存储的偏移提交的处理。
当消费者必须放弃一些分区时,将在重新平衡操作期间调用此方法。
也可以在消费者关闭或取消订阅时调用。
建议在此回调中将偏移量提交到 Kafka 或自定义偏移量存储,以防止重复数据。
示例实现:
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
示例:
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class TestConsumer {
KafkaConsumer<String, String> kafkaConsumer;
public static void main(String[] args) {
TestConsumer consumer = new TestConsumer();
consumer.pollMessages();
}
public TestConsumer() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafka-example-consumer");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList("input-topic"), new ConsumerPartitionAssignmentListener());
}
public void pollMessages() {
while(true) {
System.out.println("Polling");
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(5000));
System.out.println(records.count());
}
}
private static class ConsumerPartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions revoke listener: %s", partitions.toString()));
// Add your changes here to flush
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println(String.format("Partitions assignment listener: %s", partitions.toString()));
}
}
}
输出:
Poll
Partitions assignment listener: [input-topic-0]
0
Poll
0
Poll
Partitions revoke listener: [input-topic-0]
Partitions assignment listener: [input-topic-0]
0
Poll
0