【发布时间】:2020-03-04 23:09:23
【问题描述】:
我正在尝试使用 Apache Beam 在 GCP 中的多个文件上读取和应用一些子集。我准备了两个仅适用于一个文件的管道,但是当我在多个文件上尝试它们时失败了。除此之外,如果可能的话,我会很方便地将我的管道组合成一个,或者有办法编排它们以便它们按顺序工作。现在管道在本地工作,但我的最终目标是使用 Dataflow 运行它们。
我 textio.ReadFromText 和 textio.ReadAllFromText,但在多个文件的情况下我无法使两者都工作。
def toJson(file):
with open(file) as f:
return json.load(f)
with beam.Pipeline(options=PipelineOptions()) as p:
files = (p
| beam.io.textio.ReadFromText("gs://my_bucket/file1.txt.gz", skip_header_lines = 0)
| beam.io.WriteToText("/home/test",
file_name_suffix=".json", num_shards=1 , append_trailing_newlines = True))
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| 'read_data' >> beam.Create(['test-00000-of-00001.json'])
| "toJson" >> beam.Map(toJson)
| "takeItems" >> beam.FlatMap(lambda line: line["Items"])
| "takeSubjects" >> beam.FlatMap(lambda line: line['data']['subjects'])
| beam.combiners.Count.PerElement()
| beam.io.WriteToText("/home/items",
file_name_suffix=".txt", num_shards=1 , append_trailing_newlines = True))
这两个管道对于单个文件效果很好,但我有数百个相同格式的文件,想利用并行计算的优势。
有没有办法让这个管道适用于同一目录下的多个文件?
是否可以在单个管道中执行此操作而不是创建两个不同的管道? (将文件从桶中写入工作节点并不方便。)
【问题讨论】:
-
文件名中是否有一些元数据您试图保留到您正在写入的文件名? textio 支持 glob 模式,可以直接处理压缩类型。
-
@RezaRokni,感谢您的评论。你能给这个用例举个例子吗?我不明白。里面没有元数据。
-
我可能不了解您的用例,但您可以将 glob 模式与您的 text.io.ReadFromText("gs://my_bucket/*.txt') 一起使用。然后使用您的 beam.Map (toJson)。
-
beam 抱怨它需要读取 num_bytes 并且当我在读取中提供 num_bytes 时它说 JsonDecode 错误。
-
啊抱歉,我浏览了您的示例并错过了您尝试读取整个文件,而不是从中读取行。您是否已经尝试过使用 fileio 而不是 textio? textio 从由换行符分隔的文件中读取行。 fileio 生成代表文件及其元数据的记录集合
标签: python python-3.x apache-beam dataflow apache-beam-io