【发布时间】:2022-01-07 04:45:45
【问题描述】:
我正在使用一些 Google Cloud Platform 服务(Dataflow、Cloud Storage、PubSub)并且有以下场景:
- 许多应用将 GCS 上 CSV 文件的路径发布到 PubSub 主题。
- 在用 Python 编写的流式 Beam 管道中,我们
beam.io.ReadFromPubSub并在 DoFn 中将每个输入文件作为一个整体进行处理(每个process(elem)调用都会获得一条 PubSub 消息)。 - 这在大多数情况下都可以正常工作,但是随着文件的增长,我们希望通过使用 Dataframe API (
read_csv(...).to_pcollection(...)) 读取文件来并行处理文件,使用ParDo处理文件中的每条记录,然后传递原始文件路径作为后续阶段的辅助输入。
这里的问题是ReadFromPubSub 导致了一个无限的 PCollection,我不明白它是如何触发处理的。只要每个阶段为每个输入返回一个输出,简单的ReadFromPubSub | ProcessFile 管道就可以工作,但我需要确保ReadFromPubSub 输出包含恰好一条消息的窗口,以便为 Dataframe API 阅读器提供输入文件路径。
我认为最好的方法是使用全局窗口和 AfterCount 触发器,但它至少会触发 N 个元素,不完全是 - 似乎没有办法强制在第一个元素上触发离开。有没有办法强制执行单元素窗口?
【问题讨论】:
-
你能分享你的windows定义吗?您是否想过使用文件名作为会话 ID 创建会话窗口?
-
我想使用 Dataframe API,所以我认为我只需要将路径作为 Dataframe API 的侧面输入和
file_path = pvalue.AsSingleton(file_path_pcoll)的后续阶段,然后使用它elems = pipeline | read_csv(file_path).to_pcollection()就这样? -
至于会话窗口,不确定它应该如何工作,因为所有 API 都期望某种时间戳/时间窗口将内容放入其中。
-
不幸的是,我认为您不能以这种方式使用 DataFrame API 的 read_csv,它目前只接受字符串文件路径(可能是带有 *s 的 glob)。我们可以对其进行修改以允许您将 PCollection
与文件路径一起使用,但我们需要一种在管道构建时读取数据样本的方法。 -
我明白了......好吧,我会用讨厌的方式来做,只在文件级别进行并行化。谢谢!
标签: python apache-beam google-cloud-pubsub apache-beam-io