【问题标题】:How to perform checkpointing in apache beam while using flink runner?使用 flink runner 时如何在 apache Beam 中执行检查点?
【发布时间】:2020-10-26 07:47:25
【问题描述】:

我正在从一个未绑定的源(Kafka)阅读并将其字数写入其他 Kafka 主题。现在我想在光束管道中执行检查点。我已按照 apache beam 文档中的所有说明进行操作,但即使在此之后也没有创建检查点目录。

以下是我用于管道的参数:-

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

谁能帮我检查点?

【问题讨论】:

    标签: apache-flink apache-beam-io apache-beam apache-beam-kafkaio


    【解决方案1】:

    我已经研究过解决方案,一个是你可以更改链接集群的 flink-conf.yaml 中的 checkpoint.state.dir 路径,另一个是使用 flinkPipelineOptions-

            @Description(
                    "Sets the state backend factory to use in streaming mode. "
                            + "Defaults to the flink cluster's state.backend configuration.")
            Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
            void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
    

    并通过设置 setStateBackendFactory(我已经使用自定义类完成了)

      static class  bakend implements FlinkStateBackendFactory{
    
            @Override
            public StateBackend createStateBackend(FlinkPipelineOptions options) {
                return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");
    
            }
        }
    

    这将创建一个 checkpointDir,您还需要设置一个 checkpointinginterval 值才能启用检查点。

    【讨论】:

      【解决方案2】:

      我知道它很旧,但想同意你的回答。 我们在 2019 年构建了一个 dockerized flink,并使用这些选项进行梁和运行

      --runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev
      

      我们在conf.yml中配置了rocksdb作为后端。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-09-16
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多