【发布时间】:2019-12-16 22:59:25
【问题描述】:
在这个question,我们知道
PCollection<String> lines = p.apply(TextIO.read() .from("gs://some-bucket/many/files/*") .withHintMatchesManyFiles());使用此提示会导致转换以一种针对读取大数据而优化的方式执行 文件数:在这种情况下可以读取的文件数实际上是无限的, 并且与没有此提示相比,管道很可能会运行得更快、更便宜且更可靠。
但是,管道的步骤被以下代码卡住了
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/prefix*")
.withHintMatchesManyFiles()
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));
每分钟大约有 10 ~ 30MB 的新文件上传到 GCS。
但是,我们尝试从GCS in pub/sub 读取文件,管道可以正常工作。
raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
.apply("Read file matchall", FileIO.matchAll())
.apply("Read file match", FileIO.readMatches())
.apply("Read file", TextIO.readFiles());
我在这里缺少什么吗?或者有没有其他方法可以更有效地从 GCS 读取大量文件?
我的管道的工作流程是从 GCS 读取数据并在数据处理后下沉到 Pub/Sub。
Beam 版本:2.16.0
【问题讨论】:
-
您是否使用 BQ 作为目的地?
-
@jjayadeep,接收器是 Pub/Sub
-
您使用的是什么版本的光束?另外,平均有多少个文件?
-
@jjayadeep,每 2 分钟大约有 8 ~ 10 个 zip 文件,文件大小为 10M 字节。我使用的梁版本是2.16
-
来自此链接 - stackoverflow.com/questions/45362108/… 看起来
withHintMatchesManyFiles的设计不是为了有效处理少数文件。您可以删除提示,看看这是否解决了您的问题?
标签: google-cloud-dataflow apache-beam