【发布时间】:2018-04-05 03:55:27
【问题描述】:
我正在尝试使用集合生成一些测试数据,并将该数据写入 s3,当我这样做时,Flink 似乎根本没有做任何检查点,但是当源来自 s3 时它确实会做检查点.
例如,这个做检查点并使输出文件处于完成状态:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = {
val path = "s3a://my_bucket/simple_job/in"
env
.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = 5000L
)
}
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
同时,这不会检查点,并且即使在作业完成后也会使文件处于 .pending 状态:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(128)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(2000L)
env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))
val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString))
val sinkFunction: BucketingSink[String] =
new BucketingSink[String]("s3a://my_bucket/simple_job/out")
.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
lines.addSink(sinkFunction)
env.execute()
【问题讨论】:
标签: java scala apache-flink