【发布时间】:2020-01-21 11:53:46
【问题描述】:
我创建了一个流式 apache 光束管道,它从 GCS 文件夹中读取文件并将它们插入 BigQuery,它运行良好,但是当我停止并运行作业时它会重新处理所有文件,因此所有数据将再次被复制。
所以我的想法是将文件从扫描的目录移动到另一个目录,但我不知道如何在技术上使用 apache Beam。
谢谢
public static PipelineResult run(Options options) {
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Read from the text source.
* 2) Write each text record to Pub/Sub
*/
LOG.info("Running pipeline");
LOG.info("Input : " + options.getInputFilePattern());
LOG.info("Output : " + options.getOutputTopic());
PCollection<String> collection = pipeline
.apply("Read Text Data", TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply("Write logs", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(c.element());
c.output(c.element());
}
}));
collection.apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));
return pipeline.run();
}
【问题讨论】:
-
您的目录是否不断接收新文件?您是否希望在此管道运行时保持其运行状态?还是您想每周/每天/每月/等运行一次?
-
@Pablo ,是的,我想保持这个管道实时处理流数据,所以如果文件现在被废弃,它将被直接处理,所以代码我的代码工作正常但是当我重新午餐时作业,它重新处理所有数据。我找到了一个解决方案,但它不起作用,解决方案是创建一个动态路径,但似乎 apache Beam 在开始工作时只评估一次代码并始终保持第一个生成的路径。
-
```` 管道管道 = Pipeline.create(options);字符串路径 = "gs://dev_data/"+date.format(date).split("-")[0]+"/"+date.format(date).split("-")[1]+ "/"+date.format(date).split("-")[2]+"/*.gz"; PCollection
集合 = pipeline.apply("读取文本数据", TextIO.read().from(path).watchForNewFiles(Duration.standardSeconds(60), Watch.Growth. never())))) ;返回管道.run(); ``` @Pablo
标签: google-cloud-dataflow apache-beam dataflow apache-beam-io