【问题标题】:How to read Text file and returns additional input field using TextIO?如何使用 TextIO 读取文本文件并返回附加输入字段?
【发布时间】:2019-03-01 06:46:33
【问题描述】:

我有一个 KV 的 PCollection,其中键是文件名,值是文件的一些附加信息(例如,生成文件的“源”系统)。例如,

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

我需要从文件中读取所有行并使用“源”字段,以 KV PCollection 的形式返回。

KV(line1 from X1.dat, "SourceX")
KV(line2 from X1.dat, "SourceX")
...
KV(line1 from Y1.dat, "SourceY")

我可以通过调用 FileIO.match() 并后跟一个 DoFn 来实现这一点,在该 DoFn 中我顺序读取文件并附加 SourceX(从 SideInput 中传递的映射中检索)。

为了获得并行阅读的好处,我可以使用 TextIO.readAll() 来实现这一点吗? TextIO.read() 返回一个 PCollection,没有文件名信息。我怎样才能将它加入到文件名到源映射的映射中?尝试了 WithKeys 传输,但无法正常工作...

【问题讨论】:

  • 在哪里可以找到答案?

标签: google-cloud-platform google-cloud-dataflow apache-beam dataflow


【解决方案1】:

目前使用FileIO.match() 是实现此目的的最佳方式,但一旦https://github.com/apache/beam/pull/12645 被合并,您将能够使用新的ContextualTextIO 转换。

请注意,以分布式方式计算行号本质上是昂贵的;您可能想看看是否可以使用偏移量(计算起来更容易,并且与行号的顺序相同)。

【讨论】:

    【解决方案2】:

    如果我理解正确,您想并行读取文件吗?不幸的是,TextIO.readAll 没有这个功能。您必须使用FileIO.match,然后编写您的DoFn 以按照您想要的自定义方式读取文件。

    这是因为您将无法对文件进行随机搜索并保留行号数。

    连续读取文件是您管道的瓶颈吗?

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-08-26
      • 2012-02-04
      • 2010-10-24
      • 1970-01-01
      • 2013-03-26
      • 1970-01-01
      • 1970-01-01
      • 2016-04-02
      相关资源
      最近更新 更多