【问题标题】:Decompressing .tar file in Dataflow?在 Dataflow 中解压缩 .tar 文件?
【发布时间】:2020-08-13 15:58:10
【问题描述】:

我的 GCP 云存储桶中有很多 .tar 文件。每个 .tar 文件都有多个层。我想使用 GCP Dataflow 解压缩这些 .tar 文件并将它们放回另一个 GCP 存储桶。

我找到了 Google 提供的批量解压缩云存储文件的实用程序模板,但它不支持 .tar 文件扩展名。

也许我应该在上传到云端之前尝试解压缩文件,或者 Beam 中是否存在其他内容?

每个 tar 文件未压缩大约 15 TB。

【问题讨论】:

    标签: compression google-cloud-dataflow apache-beam tar


    【解决方案1】:

    这个 sn-p 借用了Bulk Decompress Template 的代码。它还借用了this quetstion&answer

    正如您所注意到的,不支持 TAR,但一般来说,Beam 中的压缩/解压缩似乎依赖于 Apache Commons' Compression libraries

    您将编写一个执行以下操作的管道:

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);
    
    // Run the pipeline over the work items.
    PCollectionTuple decompressOut =
        pipeline
            .apply("MatchFile(s)",
                FileIO.match().filepattern(options.getInputFilePattern()))
            .apply(
                "DecompressFile(s)",
                ParDo.of(new Decompress(options.getOutputDirectory());
    
    

    你的Decompress DoFn 看起来像这样:

    class Dearchive extends DoFn<MatchResult.Metadata, String> {
      @ProcessElement
      public void process(@Context ProcessContext context) {
        ResourceId inputFile = context.element().resourceId();
        String outputFilename = Files.getNameWithoutExtension(inputFile.toString());
        ResourceId tempFileDir =
              this.outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_DIRECTORY);
        
        TarArchiveInputStream tarInput = new TarArchiveInputStream(
            Channels.newInputStream(FileSystems.open(inputFile)));
    
        TarArchiveEntry currentEntry = tarInput.getNextTarEntry();
    
        while (currentEntry != null) {
            br = new BufferedReader(new InputStreamReader(tarInput)); // Read directly 
            ResourceId outputFile = tempFileDir.resolve(currentEntry.getName(), 
                StandardResolveOptions.RESOLVE_FILE);
            try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
              ByteStreams.copy(tarInput, Channels.newOutputStream(writerChannel));
            }
            context.output(outputFile.toString());
            currentEntry = tarInput.getNextTarEntry(); // Iterate to the next file
        }
      }
    }
    

    这是一个非常粗略且未经测试的代码 sn-p,但它应该能让您走上正确的道路。如果我们应该进一步澄清,LMK。

    【讨论】:

    • +1,数据流模板不仅对直接运行有用,而且还提供了大量经过良好测试的管道,您可以根据需要进行修改。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-10-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多