【问题标题】:Flink does not checkpoint, and BucketingSink leaves files in pending state, when source is generated from Collection当从 Collection 生成源时,Flink 不检查点,并且 BucketingSink 使文件处于挂起状态
【发布时间】:2018-04-05 03:55:27
【问题描述】:

我正在尝试使用集合生成一些测试数据,并将该数据写入 s3,当我这样做时,Flink 似乎根本没有做任何检查点,但是当源来自 s3 时它确实会做检查点.

例如,这个做检查点并使输出文件处于完成状态:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))

val lines: DataStream[String] = {
  val path = "s3a://my_bucket/simple_job/in"
  env
    .readFile(
      inputFormat = new TextInputFormat(new Path(path)),
      filePath = path,
      watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
      interval = 5000L
    )
}

val sinkFunction: BucketingSink[String] =
  new BucketingSink[String]("s3a://my_bucket/simple_job/out")
    .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))

lines.addSink(sinkFunction)

env.execute()

同时,这不会检查点,并且即使在作业完成后也会使文件处于 .pending 状态:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))

val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))

val sinkFunction: BucketingSink[String] =
  new BucketingSink[String]("s3a://my_bucket/simple_job/out")
    .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))

lines.addSink(sinkFunction)

env.execute()

【问题讨论】:

    标签: java scala apache-flink


    【解决方案1】:

    事实证明,这是因为这张票:https://issues.apache.org/jira/browse/FLINK-2646 并且只是因为来自集合的流在应用程序有时间进行单个检查点之前完成完成。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-09-30
      • 1970-01-01
      • 2020-10-09
      • 2015-09-09
      • 1970-01-01
      • 1970-01-01
      • 2021-01-21
      相关资源
      最近更新 更多