【问题标题】:Move files to another GCS folder and perform actions after an apache beam pipeline has been executed将文件移动到另一个 GCS 文件夹并在执行 apache 光束管道后执行操作
【发布时间】:2020-01-21 11:53:46
【问题描述】:

我创建了一个流式 apache 光束管道,它从 GCS 文件夹中读取文件并将它们插入 BigQuery,它运行良好,但是当我停止并运行作业时它会重新处理所有文件,因此所有数据将再次被复制。

所以我的想法是将文件从扫描的目录移动到另一个目录,但我不知道如何在技术上使用 apache Beam。

谢谢


public static PipelineResult run(Options options) {
// Create the pipeline.

        Pipeline pipeline = Pipeline.create(options);

        /*
         * Steps:
         *  1) Read from the text source.
         *  2) Write each text record to Pub/Sub
         */

        LOG.info("Running pipeline");
        LOG.info("Input : " + options.getInputFilePattern());
        LOG.info("Output : " + options.getOutputTopic());

        PCollection<String> collection = pipeline
                .apply("Read Text Data", TextIO.read()
                        .from(options.getInputFilePattern())
                        .watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))

                .apply("Write logs", ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        LOG.info(c.element());
                        c.output(c.element());
                    }
                }));

        collection.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

        return pipeline.run();
    }

【问题讨论】:

  • 您的目录是否不断接收新文件?您是否希望在此管道运行时保持其运行状态?还是您想每周/每天/每月/等运行一次?
  • @Pablo ,是的,我想保持这个管道实时处理流数据,所以如果文件现在被废弃,它将被直接处理,所以代码我的代码工作正常但是当我重新午餐时作业,它重新处理所有数据。我找到了一个解决方案,但它不起作用,解决方案是创建一个动态路径,但似乎 apache Beam 在开始工作时只评估一次代码并始终保持第一个生成的路径。
  • ```` 管道管道 = Pipeline.create(options);字符串路径 = "gs://dev_data/"+date.format(date).split("-")[0]+"/"+date.format(date).split("-")[1]+ "/"+date.format(date).split("-")[2]+"/*.gz"; PCollection 集合 = pipeline.apply("读取文本数据", TextIO.read().from(path).watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.never())))) ;返回管道.run(); ``` @Pablo

标签: google-cloud-dataflow apache-beam dataflow apache-beam-io


【解决方案1】:

几个提示:

  • 通常不希望您停止并重新运行流式管道。流式管道更适合永久运行,如果您想更改逻辑,有时还会更新。
  • 不过,可以使用 FileIO 匹配多个文件,并在处理完这些文件后移动它们。

您可以像这样编写一个 DoFn 类:ReadWholeFileThenMoveToAnotherBucketDoFn,它将读取整个文件,然后将其移动到一个新的存储桶中。

    Pipeline pipeline = Pipeline.create(options);


    PCollection<FileIO.Match> matches = pipeline
            .apply("Read Text Data", FileIO.match()
                    .filepattern(options.getInputFilePattern())
                    .continuously(Duration.standardSeconds(60), 
                                  Watch.Growth.<String>never()));

    matches.apply(FileIO.readMatches())
           .apply(ParDo.of(new ReadWholeFileThenMoveToAnotherBucketDoFn()))
            .apply("Write logs", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    LOG.info(c.element());
                    c.output(c.element());
                }
            }));

    ....

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-03
    相关资源
    最近更新 更多