【问题标题】:Flink streaming - Change part file names when using StreamingFileSink?Flink流 - 使用StreamingFileSink时更改部分文件名?
【发布时间】:2019-05-20 16:24:49
【问题描述】:

我正在尝试使用 Flink 流来使用 Kafka 主题消息并(定期)创建将保存在 s3 上的 parquet 文件。
有没有办法使用批量格式的流式文件接收器将创建的部分文件名(或添加后缀/前缀)更改为比 part-0-0 或 part-1-3 更独特?

StreamingFileSink<> sink = StreamingFileSink.forBulkFormat(new Path("s3://test-bucket/"),               ParquetAvroFactory.getParquetWriter(schema,  CompressionCodec.UNCOMPRESSED.name()))
.withBucketAssigner(new PartitionBucketAssigner(partitionColumns))
.build();

【问题讨论】:

  • 您对ParquetAvroFactory 的实施情况如何?
  • 你找到解决方案了吗?我正在寻找相同的内容以及如何避免在使用检查点重新启动我的应用程序时覆盖我在 s3 中的文件?
  • 如果您从检查点重新启动,则应该没有问题,因为该文件将包含检查点序列号的后缀。无论如何,Flink 在 1.10 版本中会有零件文件名的解决方案

标签: java apache-flink avro parquet


【解决方案1】:

您可以覆盖 BucketAssigner 上的 getBucketId 方法(请参阅https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html),这将影响路径,但显然不会影响部分文件名(请参阅下面的评论)。

零件文件名建立在org.apache.flink.streaming.api.functions.sink.filesystem.Bucket的这段代码中:

private Path assembleNewPartPath() {
    return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
}

这似乎不是为定制而设计的。

【讨论】:

  • 试过了,它只改变要写入文件的子目录,而不是文件名本身
  • 使用检查点重新启动我的应用程序时,如何避免覆盖我在 s3 中的文件?
  • @YitzchakLieberman 如果您从“保存点”重新启动,您将不会覆盖文件。如果您重新开始,您将覆盖文件。
猜你喜欢
  • 2020-06-08
  • 1970-01-01
  • 2020-09-20
  • 2019-12-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多