【发布时间】:2018-12-28 05:48:14
【问题描述】:
处理从多个文件夹中读取文件,然后使用 python sdk 和数据流运行器将具有文件名的文件内容(文件内容,文件名)输出到 apache Beam 中的 bigquery。
最初以为我可以为每个文件创建一个 pcollection,然后将文件内容与文件名映射。
def read_documents(pipeline):
"""Read the documents at the provided uris and returns (uri, line) pairs."""
pcolls = []
count = 0
with open(TESTIN) as uris:
for uri in uris:
#print str(uri).strip("[]/'")
pcolls.append(
pipeline
| 'Read: uri' + str(uri) >>ReadFromText(str(uri).strip("[]/'"), compression_type = 'gzip')
| 'WithKey: uri' + str(uri) >> beam.Map(lambda v, uri: (v, str(uri).strip("[]")), uri)
)
return pcolls | 'FlattenReadPColls' >> beam.Flatten()
这工作正常,但速度很慢,并且在大约 10000 个文件后无法在数据流云上工作。如果超过 10000 个左右的文件,它将遭受管道损坏。
目前正在尝试从 Text.io 重载 ReadAllFromText 函数。 Text.io 旨在从文件名或模式的集合中快速读取大量文件。如果从 Google 云存储读取并且文件具有内容编码,则此模块中存在错误。谷歌云存储会自动压缩文件并对其进行转码,但由于某种原因,ReadAllFromText 无法使用它。您必须更改文件的元数据以删除内容编码并将 ReadAllFromText 上的压缩类型设置为 gzip。我将包含此问题 url,以防其他人对 ReadAllFromText 有问题 https://issues.apache.org/jira/browse/BEAM-1874
我当前的代码如下所示
class ReadFromGs(ReadAllFromText):
def __init__(self):
super(ReadFromGs, self).__init__(compression_type="gzip")
def expand(self, pvalue):
files = self._read_all_files
return (
pvalue
| 'ReadAllFiles' >> files #self._read_all_files
| 'Map values' >> beam.Map( lambda v: (v, filename)) # filename is a placeholder for the input filename that im trying to figure out how to include in the output.
)
ReadAllFromText 包含在 Text.io 中,从 filebasedsource.py 调用 ReadAllText 并继承自 PTransform。
我相信我只是缺少一些简单的东西。
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/textio.py
【问题讨论】:
标签: python google-cloud-platform google-cloud-dataflow apache-beam