【问题标题】:Flink state backend config with the state processor api使用状态处理器 api 进行 Flink 状态后端配置
【发布时间】:2021-09-08 01:34:17
【问题描述】:

我使用 state-processor-api,因为它被发布来引导我的 flink 状态。我使用 RocksDBStateBackend 并且它有效。 我们最近去了 flink 1.13,RocksDBStateBackend 被弃用,取而代之的是 EmbeddedRocksDBStateBackend。

我的问题:

由于 API 的变化和我开发的新引导作业,我得到了以下异常:

Caused by: java.io.IOException: Size of the state is large than the 最大允许的内存支持状态。尺寸=85356498,最大尺寸=5242880 .考虑使用不同的状态后端,例如文件系统状态 后端。

在这里我声明我的 statebackend:

val backend = new EmbeddedRocksDBStateBackend(true)

我在这里创建我的保存点:

  Savepoint
    .create(backend, MAX_PARALLELISM)
    .withOperator("my_operator", transformMyOperator)
    .write(savepointPath)

另外,我的 flink 集群配置为使用 RocksDB 状态后端, 和所有其他 flink 拓扑都使用 RocksDB 后端。

所以我想知道为什么我得到一个异常说我不应该使用内存状态后端,因为我使用的是 RocksDB。欢迎任何帮助。

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    这是由 1.13 中的错误引起的,请参阅 FLINK-23728,运行 1.14.0-RC0 确实为我解决了这个问题。

    【讨论】:

      【解决方案2】:

      在 Flink 1.13 中,状态后端的选择与检查点存储提供程序的选择分离。

      我猜想以前您依赖 RocksDBStateBackend 构造函数来指定您希望存储检查点的位置。现在您应该在flink-conf.yaml

      中进行配置
      state.checkpoints.dir: file:///checkpoint-dir/
      

      或在您的代码中

      env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
      

      更多详情请见Migrating from Legacy Backends

      【讨论】:

      • 我已经在我的 jobManager 上设置了这个配置。那么也许是因为我使用的是 ExecutionEnvironment 而不是 StreamExecutionEnvironment ?我不能使用流一,因为我需要状态处理器 api 的 DataSet api。附带说明一下,我们不能在代码中的 ExecutionEnvironment 上设置检查点配置。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-03-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多