【问题标题】:Flink: How to persist and recover a ValueStateFlink:如何持久化和恢复一个 ValueState
【发布时间】:2018-09-28 07:46:42
【问题描述】:

我使用 Flink 来丰富输入流

case class Input( key: String, message: String )

预计算分数

case class Score( key: String, score: Int )

并产生输出

case class Output( key: String, message: String, score: Int )

输入流和分数流都从 Kafka 主题中读取,生成的输出流也发布到 Kafka

val processed = inputStream.connect( scoreStream )
                           .flatMap( new ScoreEnrichmentFunction )
                           .addSink( producer )

具有以下 ScoreEnrichmentFunction:

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
    val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
    lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )

    override def flatMap1( input: Input, out: Collector[Output] ): Unit = 
    {
        Option( scoreState.value ) match {
            case None => out.collect( Output( input.key, input.message, -1 ) )
            case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )  
        }
    }

    override def flatMap2( score: Score, out: Collector[Output] ): Unit = 
    {
        scoreState.update( score )
    } 
}

这很好用。但是,如果我采取安全点并取消 Flink 作业,则当我从保存点恢复作业时,存储在 ValueState 中的分数会丢失。

据我了解,ScoreEnrichmentFunction 似乎需要使用 CheckPointedFunction 进行扩展

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction

但我很难理解如何实现方法 snapshotState 和 initializeState 以使用键控状态

override def snapshotState( context: FunctionSnapshotContext ): Unit = ???


override def initializeState( context: FunctionInitializationContext ): Unit = ???

请注意,我使用以下环境:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism( 2 )
    env.setBufferTimeout( 1 )
    env.enableCheckpointing( 1000 )
    env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
    env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
    env.getCheckpointConfig.setCheckpointTimeout( 60000 )
    env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
    env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )

【问题讨论】:

  • 这看起来应该可以。仅供参考,snapshotState 和 initializeState 用于非键控状态,并且不与键控状态一起使用(我看不到您正在对流进行键控,但我假设您在未共享的代码中这样做)。您如何使用保存点重新启动,您如何确定状态未恢复?
  • 另外:您是尝试从保存点恢复,还是从外部检查点恢复?
  • 确实,scoreStream 和 inputStream 是键控的。为了检查状态是否已加载,我在输出流(输出 Kafka 主题)中检查了 Output.score 的值。如果它与 -1 不同,我知道分数已正确加载并且浓缩是可以的。
  • 我进行如下操作:我使用“bin/flink run myjar.jar”开始工作,我将分数发送到 kafka(分数主题),然后我发送输入(输入主题),然后我检查输出是否正常(输出主题)。然后我用“bin/flink cancel -s [:targetDirectory] ​​:jobId”取消作业,然后用“./bin/flink run myjar.jar -s my-save-point-path”恢复它。那时,我在输入主题上发送一系列新的输入,并检查输出主题。
  • 您使用的是哪个州的后端?

标签: scala apache-flink savepoints


【解决方案1】:

我想我找到了问题所在。我试图为检查点和保存点使用单独的目录,这导致保存点目录和 FsStateBackend 目录不同。

使用同一个目录

val backend = new FsStateBackend( "file:/data", true )
env.setStateBackend( backend )

以及在获取保存点时

bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data

解决问题。

【讨论】:

  • 这只能通过命令行完成,或者你也可以在 IDE 上完成?我试过了,但初始状态仍然没有向 ListState 获取任何数据
猜你喜欢
  • 1970-01-01
  • 2015-11-11
  • 1970-01-01
  • 1970-01-01
  • 2021-02-23
  • 1970-01-01
  • 2015-08-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多