转发请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7700600.html
《flink-connector-kafka consumer的topic分区分配源码》一文提到了在flink-connector-kafka的consumer初始化的时候有三种offset提交模式:KAFKA_PERIODIC,DISABLED和ON_CHECKPOINTS。
其中ON_CHECKPOINTS表示在flink做完checkpoint后主动向kafka提交offset的方法,本文主要分析一下flink-connector-kafka在源码如何使用checkpoint机制实现offset的恢复和提交。
flink conusmer的实现基类FlinkKafkaConsumerBase定义如下,这个类实现了了与checkpoin相关的三个接口CheckpointedFunction,CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>>,CheckpointListener。根据官网文档,CheckpointedRestoring的restoreState()方法已经被CheckpointedFunction的initializeState取代,所以重点关注三个方法实现
1initializeState() 实例初始化或者recover的时候调用
2snapshotState() 每次创建checkpoint的时候调用
3 notifyCheckpointComplete() 每次checkpoint结束的时候调用
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction, CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {