【发布时间】:2021-05-26 05:52:10
【问题描述】:
flink 1.7 版本
我试图从保存点(或检查点)恢复 flink 作业,该作业的工作是从 kafka 读取 -> 执行 30 分钟窗口聚合(如计数器) -> 下沉到 kafka。
我使用rocksdb并启用了检查点。
现在我尝试手动触发保存点。 每个聚合的期望值为 30(1 个数据/每分钟)。 但是当我从保存点恢复时(flink run -d -s {url}),聚合值不是 30(小于 30,取决于我取消 flink 作业和恢复的时间)。 当作业正常运行时,它得到 30。
我不知道为什么有些数据似乎丢失了?
并且日志显示“FlinkKafkaConsumer 没有恢复状态”
主要代码:
source.flatMap(new FlatMapFunction<String, Model>() {
private static final long serialVersionUID = 5814342517597371470L;
@Override
public void flatMap(String value, Collector<Model> out) throws Exception {
LOGGER.info("----> catch value: " + value);
Model model = JSONObject.parseObject(value, Model.class);
out.collect(model);
}
}).uid("flatmap-1").name("flatmap-1").assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Model>() {
private static final long serialVersionUID = -1742071229344039681L;
@Override
public long extractTimestamp(Model element, long previousElementTimestamp) {
return element.getTime();
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Model lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
}).setParallelism(1).keyBy(Model::getDim).window(new DynamicWindowAssigner()).aggregate(new AggregateFunction<Model, Model, Model>() {
@Override
public Model createAccumulator() {
return new Model();
}
@Override
public Model add(Model value, Model accumulator) {
init(value, accumulator);
accumulator.setValue(accumulator.getValue() + 1);
return accumulator;
}
@Override
public Model getResult(Model accumulator) {
return accumulator;
}
@Override
public Model merge(Model a, Model b) {
return null;
}
private void init(Model value, Model accumulator){
if(accumulator.getTime() == 0L){
accumulator.setValue(0);
accumulator.setDim(value.getDim());
accumulator.setTime(value.getTime());
}
}
}).uid("agg-1").name("agg-1").map(new MapFunction<Model, String>() {
private static final long serialVersionUID = -1742071229344039681L;
@Override
public String map(Model value) throws Exception {
value.setTime(TimeWindow.getWindowStartWithOffset(value.getTime(), 0, TimeUnit.MINUTES.toMillis(30)));
return JSONObject.toJSONString(value);
}
}).uid("flatmap-2").name("flatmap-2").setParallelism(4).addSink(metricProducer).uid("sink").name("sink").setParallelism(2);
检查点设置:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(120000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
StateBackend stateBackend = new RocksDBStateBackend(${path}, true);
env.setStateBackend(stateBackend);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().disableSysoutLogging();
【问题讨论】:
-
“无恢复状态”仅在未使用检查点或保存点来初始化作业状态时记录,这解释了您看到不正确结果的原因。我看不出您分享的内容有任何明显错误,但也许如果您分享更多细节,问题就会变得明显。 (另外,你运行的是 Flink 1.7.2 还是更早的版本?)
-
@David Anderson thx,我使用 v1.7.1
-
@David Anderson 我检查了保存点的 hdfs 目录,它似乎包含“_meta”和其他目录(保存流状态和 kafka 偏移量?)
-
代码更新,flink集群由3个master,几个slave节点组合而成,flink运行时为flink-dist_2.11-1.7.1.jar
标签: apache-flink restore checkpoint