【问题标题】:Why flink can't restore from a savepoint为什么 flink 无法从保存点恢复
【发布时间】:2021-05-26 05:52:10
【问题描述】:

flink 1.7 版本

我试图从保存点(或检查点)恢复 flink 作业,该作业的工作是从 kafka 读取 -> 执行 30 分钟窗口聚合(如计数器) -> 下沉到 kafka。

我使用rocksdb并启用了检查点。

现在我尝试手动触发保存点。 每个聚合的期望值为 30(1 个数据/每分钟)。 但是当我从保存点恢复时(flink run -d -s {url}),聚合值不是 30(小于 30,取决于我取消 flink 作业和恢复的时间)。 当作业正常运行时,它得到 30。

我不知道为什么有些数据似乎丢失了?

并且日志显示“FlinkKafkaConsumer 没有恢复状态”

主要代码:

        source.flatMap(new FlatMapFunction<String, Model>() {
        private static final long serialVersionUID = 5814342517597371470L;

        @Override
        public void flatMap(String value, Collector<Model> out) throws Exception {
            LOGGER.info("----> catch value: " + value);
            Model model =  JSONObject.parseObject(value, Model.class);
            out.collect(model);
        }
    }).uid("flatmap-1").name("flatmap-1").assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Model>() {

        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public long extractTimestamp(Model element, long previousElementTimestamp) {
            return element.getTime();
        }

        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Model lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }).setParallelism(1).keyBy(Model::getDim).window(new DynamicWindowAssigner()).aggregate(new AggregateFunction<Model, Model, Model>() {
        @Override
        public Model createAccumulator() {
            return new Model();
        }

        @Override
        public Model add(Model value, Model accumulator) {
            init(value, accumulator);
            accumulator.setValue(accumulator.getValue() + 1);
            return accumulator;
        }

        @Override
        public Model getResult(Model accumulator) {
            return accumulator;
        }

        @Override
        public Model merge(Model a, Model b) {
            return null;
        }

        private void init(Model value, Model accumulator){
            if(accumulator.getTime() == 0L){
                accumulator.setValue(0);
                accumulator.setDim(value.getDim());
                accumulator.setTime(value.getTime());
            }
        }
    }).uid("agg-1").name("agg-1").map(new MapFunction<Model, String>() {
        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public String map(Model value) throws Exception {
            value.setTime(TimeWindow.getWindowStartWithOffset(value.getTime(), 0, TimeUnit.MINUTES.toMillis(30)));
            return JSONObject.toJSONString(value);
        }
    }).uid("flatmap-2").name("flatmap-2").setParallelism(4).addSink(metricProducer).uid("sink").name("sink").setParallelism(2);

检查点设置:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(120000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
    StateBackend stateBackend = new RocksDBStateBackend(${path}, true);
    env.setStateBackend(stateBackend);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();

【问题讨论】:

  • “无恢复状态”仅在未使用检查点或保存点来初始化作业状态时记录,这解释了您看到不正确结果的原因。我看不出您分享的内容有任何明显错误,但也许如果您分享更多细节,问题就会变得明显。 (另外,你运行的是 Flink 1.7.2 还是更早的版本?)
  • @David Anderson thx,我使用 v1.7.1
  • @David Anderson 我检查了保存点的 hdfs 目录,它似乎包含“_meta”和其他目录(保存流状态和 kafka 偏移量?)
  • 代码更新,flink集群由3个master,几个slave节点组合而成,flink运行时为flink-dist_2.11-1.7.1.jar

标签: apache-flink restore checkpoint


【解决方案1】:

最后事实证明我应该使用 flink run -s {savepoint} -d xxx.jar flink 的 instesad 运行 -d xxx.jar -s {savepoint},如果“-d”标志在“-s”标志前面,则 flink 以某种方式忽略“-s”

【讨论】:

    猜你喜欢
    • 2020-10-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-16
    • 1970-01-01
    • 2021-01-31
    • 1970-01-01
    相关资源
    最近更新 更多