【问题标题】:Apache Beam TextIO.ReadAll How to emit KeyValue instead of String of PcollectionApache Beam TextIO.ReadAll 如何发出 KeyValue 而不是 Pcollection 的字符串
【发布时间】:2025-12-21 06:20:09
【问题描述】:

管道从 PUBSUBIo 读取开始。 PubSub IO 中的消息是 GCS 文件路径。我知道我可以使用ReadAll() 从每个路径发出行。但是,它不符合我的目的(有关文件路径的信息丢失)。我需要的是发出一个KV<'Filepath','Lines inside files'>

PubSUB 消息看起来像

Message1 -> gs://folder1/Topic1/topicfile1.gz
Message2 -> gs://folder1/Topic2/topicfile2.gz

假设文件内容如下

topicfile1.gz
{
topic1.line1
topic1.line2
}

topicfile2.gz
{
topic2.line1
topic2.line2
}

我期待的是一个像下面这样的 pcollection

{KV<'gs://folder1/Topic1/topicfile1.gz','topic1.line1'>}
{KV<'gs://folder1/Topic1/topicfile1.gz','topic1.line2'>}
{KV<'gs://folder1/Topic2/topicfile2.gz','topic2.line1'>}
{KV<'gs://folder1/Topic2/topicfile2.gz','topic2.line2'>}

我找不到从 ParDo 函数内的路径读取文件以将路径映射到行的方法。

希望这很清楚。

【问题讨论】:

  • 您能否重新表述一下这个问题并分享一个您要解决的问题的示例?目前尚不清楚您到底在做什么,什么不起作用。
  • 例如目前尚不清楚您是从 Pubsub 主题还是某些文件中阅读。 TextIO 和它有什么关系。
  • 为问题添加了更好的解释。希望对你有帮助

标签: java apache-beam


【解决方案1】:

如果我正确理解问题,我认为 TextIO 开箱即用不支持此功能。

详情

当您应用 readAll() 之类的转换时,在从 IO 获取初始文件路径和最终从所有文件发出所有行之间涉及几个步骤。

例如逻辑in TextIO:

  • 它接受PCollection 的文件路径(或路径模式);
  • 它应用FileIO.matchAll(),将路径模式的PCollection 转换为描述这些路径的MatchResult.Metadata 对象的PCollection
  • 然后它应用FileIO.readMatches() 将元数据对象转换为描述特定文件的ReadableFile 对象;
  • 最后它应用TextIO.readFiles() 接受ReadableFile 并输出该文件中的所有字符串;
    • 在这最后一步中,您可能希望在输出中添加一个文件路径,以便您知道哪个字符串来自哪个文件。如果可以选择将最后一步更改为发出KV&lt;ReadableFile, String&gt; 而不仅仅是字符串,这样您就可以使用ReadableFile.metadata 访问文件路径,这会有什么帮助。

环顾该代码,似乎从文件中发出原始行是目前使用TextIO 唯一受支持的处理方式。

解决方法

可能最直接的方法是编写自己的PTransform,类似于TextIO.ReadAll。这将像这样工作:

高级:

  • 创建和自定义您自己的版本TextIO.ReadAll
  • ReadAllViaFileBasedSource;
  • 更改您的 ReadAllViaFileBasedSource 版本以发出您想要的内容;
  • 使用这个自定义版本的TextIO.ReadAll,它使用你的自定义版本ReadAllViaFileBasedSource,它会发出正确的东西;

稍微详细一点:

【讨论】:

  • 感谢您的冗长解释。我正在尝试这样做,但某些功能出现错误。例如createForSubrangeOfFile(org.apache.beam.sdk.io.fs.MatchResult.Metadata, long, long)' has protected access in 'org.apache.beam.sdk.io.FileBasedSource' 这是 ReadAllViaFileBasedSource 中的一个函数。你知道在这种情况下我应该做什么吗?对不起,我是新来的。
最近更新 更多