【问题标题】:How to perform checkpointing in apache beam while using flink runner?使用 flink runner 时如何在 apache Beam 中执行检查点?
【发布时间】:2020-10-26 07:47:25
【问题描述】:
我正在从一个未绑定的源(Kafka)阅读并将其字数写入其他 Kafka 主题。现在我想在光束管道中执行检查点。我已按照 apache beam 文档中的所有说明进行操作,但即使在此之后也没有创建检查点目录。
以下是我用于管道的参数:-
--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
谁能帮我检查点?
【问题讨论】:
标签:
apache-flink
apache-beam-io
apache-beam
apache-beam-kafkaio
【解决方案1】:
我已经研究过解决方案,一个是你可以更改链接集群的 flink-conf.yaml 中的 checkpoint.state.dir 路径,另一个是使用 flinkPipelineOptions-
@Description(
"Sets the state backend factory to use in streaming mode. "
+ "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
并通过设置 setStateBackendFactory(我已经使用自定义类完成了)
static class bakend implements FlinkStateBackendFactory{
@Override
public StateBackend createStateBackend(FlinkPipelineOptions options) {
return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");
}
}
这将创建一个 checkpointDir,您还需要设置一个 checkpointinginterval 值才能启用检查点。
【解决方案2】:
我知道它很旧,但想同意你的回答。
我们在 2019 年构建了一个 dockerized flink,并使用这些选项进行梁和运行
--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev
我们在conf.yml中配置了rocksdb作为后端。