【问题标题】:Getting Exception while inspecting flink savepoint using state processor api使用状态处理器 api 检查 flink 保存点时出现异常
【发布时间】:2020-01-21 18:45:42
【问题描述】:

我在线程“main”java.lang.IllegalAccessError 中遇到异常:类 org.apache.flink.state.api.runtime.SavepointLoader 试图访问受保护的方法 org.apache.flink.runtime.state.filesystem。 AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String;)Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader 和 org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage 在 loader 'app' 的未命名模块中)

使用 flink 1.8。 使用下面的 Maven 仓库:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-state-processor-api_2.12</artifactId>
      <version>1.9.1</version>
    </dependency>

源码sn-p

        ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
        savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));

在调用下面函数的第二行获取异常

    public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
    org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
    ...
    ...
}

调用下面的函数:

    package org.apache.flink.state.api.runtime;

    public static Savepoint loadSavepoint(String savepointPath) throws IOException {
        CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
            .resolveCheckpointPointer(savepointPath);

        try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
            return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
        }
    }

调用下面的函数:

    package org.apache.flink.runtime.state.filesystem;

    protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
        checkNotNull(checkpointPointer, "checkpointPointer");
        checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
       ...
       ...
}

如果我们仔细看,这里调用了不同包的保护函数。这是 flink maven repo 中的错误还是我使用错误的方式? 还有其他方法可以反序列化或读取 flink 保存点和检查点吗?

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    您的 flink 似乎存在依赖版本不匹配

    将以下依赖项添加到 pom.xml 并重新构建,同时从同一文件中删除 flink-clients 的旧版本依赖项。

    <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.9.1</version>
    </dependency>
    

    【讨论】:

      【解决方案2】:

      状态处理器 API 只能用于运行 Flink 1.9 或更高版本的批处理作业,但它可以用于读取运行旧版本 Flink(返回到 Flink 1.6)的流式作业写入的保存点和检查点。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-01-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多