【问题标题】:Beam/Dataflow read Parquet files and add file name/path to each recordBeam/Dataflow 读取 Parquet 文件并将文件名/路径添加到每条记录
【发布时间】:2021-02-28 08:11:57
【问题描述】:

我正在使用 Apache Beam Python SDK,我正在尝试使用 apache_beam.io.parquetio 从 Parquet 文件中读取数据,但我还想将文件名(或路径)添加到数据中,因为它也包含数据。我查看了建议的模式here 并读到 Parquetio 与 fileio 相似,但它似乎没有实现允许遍历文件并将其添加到聚会的功能。

有人想出一个好方法来实现这个吗?

谢谢!

【问题讨论】:

  • 你使用哪个 SDK?
  • @AlexeyRomanenko - Python!

标签: google-cloud-dataflow apache-beam parquet


【解决方案1】:

如果文件的数量不是很大,可以先通过IO读取所有文件。

import glob

filelist = glob.glob('/tmp/*.parquet')
p = beam.Pipeline()

class PairWithFile(beam.DoFn):
    def __init__(self, filename):
        self._filename = filename

    def process(self, e):
        yield (self._filename, e)

file_with_records = [
    (p 
     | 'Read %s' % (file) >> beam.io.ReadFromParquet(file)
     | 'Pair %s' % (file) >> beam.ParDo(PairWithFile(file)))
    for file in filelist 
] | beam.Flatten()

那么你的 PCollection 看起来像这样:

【讨论】:

  • 谢谢!文件数量不多,但每个文件都很大。可以以这种方式并行读取文件(在遍历文件列表时)吗?在这里使用 Flatten 不会有惩罚吗?
  • 另外,这似乎不起作用 - 文件列表中的最后一项已添加到所有记录中
  • 读取是并行的。文件大小无关紧要,因为文件的读取仍然由 IO 连接器完成。列表理解不做任何工作。它只是构建管道的形状。所有的执行都在 Beam 中的管道构建之后延迟。 Flatten 将多个 PCollection 对象合并为一个逻辑 PCollection。那里应该没有太多的惩罚。
  • 我已经编辑了原始代码sn-p。原始代码的问题在于延迟执行仅在 beam.Map() 的 lambda 中考虑文件名的最后状态。通过更改,文件名被封装到 ParDo 的定义中,以便文件名可用于延迟执行。
  • 太棒了!谢谢!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-01-06
  • 1970-01-01
  • 2013-07-14
  • 1970-01-01
  • 1970-01-01
  • 2018-07-16
  • 1970-01-01
相关资源
最近更新 更多