【问题标题】:Flink StreamingFileSink forBulkFormat make partFileIndex keep growingFlink StreamingFileSink for BulkFormat 使 partFileIndex 不断增长
【发布时间】:2021-09-03 07:38:43
【问题描述】:

我正在使用 flink v1.11.2 并尝试将我的 protobuf 数据下沉到 hdfs,我从 document 获取代码

我的代码正在遵循


val writer = ParquetProtoWriters.forTypeWithConf(classOf[RawSample], CompressionCodecName.GZIP)

val sinker = StreamingFileSink
      .forBulkFormat(new Path(option.dumpOutputPath), writer)
      .withBucketAssigner(new DateTimeBucketAssigner[RawSample]("yyyy-MM-dd/HH"))
      .withRollingPolicy(OnCheckpointRollingPolicy.build())
      .withBucketCheckInterval(option.rolloverInterval)
      .withOutputFileConfig(OutputFileConfig.builder().withPartSuffix(".gz.parquet").build())
      .build()

我复制 ParquetProtoWriters 代码以支持 gzip 压缩,RawSample 是一个 protobuf 生成的类,它确实将文件接收到 hdfs,但文件名看起来像

└── 2019-08-25/12
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet
└── 2019-08-25/13
    ├── part-0-10.gz.parquet
    ├── part-0-11.gz.parquet
    ├── ...
    ├── part-0-19.gz.parquet
└── 2019-08-25/14
    ├── part-0-20.gz.parquet
    ├── part-0-21.gz.parquet
    ├── ...
    ├── part-0-29.gz.parquet

零件文件的Part file configuration中的partFileIndex字段不断增长,有什么办法可以让它每小时从0开始,让它看起来像

└── 2019-08-25/12
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet
└── 2019-08-25/13
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet
└── 2019-08-25/14
    ├── part-0-0.gz.parquet
    ├── part-0-1.gz.parquet
    ├── ...
    ├── part-0-9.gz.parquet

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    很遗憾,不,partFileIndex 会全局递增以防止文件名重复

    【讨论】:

      猜你喜欢
      • 2020-06-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多