【发布时间】:2021-09-03 07:38:43
【问题描述】:
我正在使用 flink v1.11.2 并尝试将我的 protobuf 数据下沉到 hdfs,我从 document 获取代码
我的代码正在遵循
val writer = ParquetProtoWriters.forTypeWithConf(classOf[RawSample], CompressionCodecName.GZIP)
val sinker = StreamingFileSink
.forBulkFormat(new Path(option.dumpOutputPath), writer)
.withBucketAssigner(new DateTimeBucketAssigner[RawSample]("yyyy-MM-dd/HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withBucketCheckInterval(option.rolloverInterval)
.withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".gz.parquet").build())
.build()
我复制 ParquetProtoWriters 代码以支持 gzip 压缩,RawSample 是一个 protobuf 生成的类,它确实将文件接收到 hdfs,但文件名看起来像
└── 2019-08-25/12
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
└── 2019-08-25/13
├── part-0-10.gz.parquet
├── part-0-11.gz.parquet
├── ...
├── part-0-19.gz.parquet
└── 2019-08-25/14
├── part-0-20.gz.parquet
├── part-0-21.gz.parquet
├── ...
├── part-0-29.gz.parquet
零件文件的Part file configuration中的partFileIndex字段不断增长,有什么办法可以让它每小时从0开始,让它看起来像
└── 2019-08-25/12
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
└── 2019-08-25/13
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
└── 2019-08-25/14
├── part-0-0.gz.parquet
├── part-0-1.gz.parquet
├── ...
├── part-0-9.gz.parquet
【问题讨论】:
标签: apache-flink flink-streaming