【问题标题】:如何将每个传入的 PubSub 消息窗口化到一个新窗口中?
【发布时间】:2022-01-07 04:45:45
【问题描述】:

我正在使用一些 Google Cloud Platform 服务(Dataflow、Cloud Storage、PubSub)并且有以下场景:

  • 许多应用将 GCS 上 CSV 文件的路径发布到 PubSub 主题。
  • 在用 Python 编写的流式 Beam 管道中,我们beam.io.ReadFromPubSub 并在 DoFn 中将每个输入文件作为一个整体进行处理(每个 process(elem) 调用都会获得一条 PubSub 消息)。
  • 这在大多数情况下都可以正常工作,但是随着文件的增长,我们希望通过使用 Dataframe API (read_csv(...).to_pcollection(...)) 读取文件来并行处理文件,使用 ParDo 处理文件中的每条记录,然后传递原始文件路径作为后续阶段的辅助输入。

这里的问题是ReadFromPubSub 导致了一个无限的 PCollection,我不明白它是如何触发处理的。只要每个阶段为每个输入返回一个输出,简单的ReadFromPubSub | ProcessFile 管道就可以工作,但我需要确保ReadFromPubSub 输出包含恰好一条消息的窗口,以便为 Dataframe API 阅读器提供输入文件路径。

我认为最好的方法是使用全局窗口和 AfterCount 触发器,但它至少会触发 N 个元素,不完全是 - 似乎没有办法强制在第一个元素上触发离开。有没有办法强制执行单元素窗口?

【问题讨论】:

  • 你能分享你的windows定义吗?您是否想过使用文件名作为会话 ID 创建会话窗口?
  • 我想使用 Dataframe API,所以我认为我只需要将路径作为 Dataframe API 的侧面输入和file_path = pvalue.AsSingleton(file_path_pcoll) 的后续阶段,然后使用它elems = pipeline | read_csv(file_path).to_pcollection() 就这样?
  • 至于会话窗口,不确定它应该如何工作,因为所有 API 都期望某种时间戳/时间窗口将内容放入其中。
  • 不幸的是,我认为您不能以这种方式使用 DataFrame API 的 read_csv,它目前只接受字符串文件路径(可能是带有 *s 的 glob)。我们可以对其进行修改以允许您将 PCollection 与文件路径一起使用,但我们需要一种在管道构建时读取数据样本的方法。
  • 我明白了......好吧,我会用讨厌的方式来做,只在文件级别进行并行化。谢谢!

标签: python apache-beam google-cloud-pubsub apache-beam-io


【解决方案1】:

Beam 没有直接的方法来执行此操作。最初你会得到一个PCollection<File>,它会被转换成一个PCollection<Element>。此时您可能会丢失有关文件名的信息,因此您需要明确跟踪。

我可以建议一些解决方法,但不确定这些是否完全符合您的用例。

  1. 在每条记录的输入数据中以某种方式包含文件名(或文件名的哈希)。这将提供一种从给定记录中确定文件名的直接方法
  2. 使用保留文件名的转换。例如,textio.ReadFromTextWithFilename
  3. 保留来自 的映射(例如在 Beam 状态下)并在需要时参考。当窗口关闭时,您将知道潜在的文件列表,但必须以某种方式进一步缩小到单个文件,以便在下游阶段将单个元素映射到文件名(使用您提到的 AfterCount 触发器可能会简化这一点)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-31
    • 1970-01-01
    相关资源
    最近更新 更多