【发布时间】:2021-04-03 07:27:32
【问题描述】:
我希望能够在单个 PCollection 元素中维护一组实体,但同时从 Google Cloud Storage (GCS) 获取这些实体。即PCollection<Iterable<String>> --> PCollection<Iterable<String>> 其中起始PCollection 是文件路径的可迭代,而生成的PCollection 是文件内容的可迭代。或者,PCollection<String> --> PCollection<Iterable<String>> 也可以工作,甚至可能更可取,其中起始 PCollection 是 glob 模式,而生成的 PCollection 是与 glob 匹配的文件内容的可迭代。
我的用例是在我的管道中的某个点,我有输入 PCollection<String>。 PCollection 的每个元素都是一个 GCS 全局模式。将与 glob 匹配的文件分组在一起很重要,因为文件的内容(一旦读取了组中的所有文件)需要在管道的下游进行分组。我最初尝试使用 FileIO.matchAll 和随后的 GroupByKey 。但是,matchAll、window 和GroupByKey 组合无法保证在执行 GroupByKey 转换之前,所有匹配 glob 的文件都将在同一个窗口中被读取(尽管我可能误解了 Windowing)。如果应用较大的时间跨度WindowFn,则有可能达到预期的结果,但它仍然是概率性的,而不是保证在分组之前将读取所有文件。保持尽可能低的延迟也是我的管道的主要目标。
所以我的下一个并且目前正在运行的计划是使用AsyncHttpClient 来扇出通过 GCS HTTP API 获取文件内容。我觉得这违背了 Beam 中的原则,并且在并行化方面可能不是最佳的。
所以我开始研究 SplittableDoFn 。我目前的计划是允许拆分,以便输入 Iterable 中的每个实体(即 glob 模式中的每个匹配文件)都可以单独处理。我已经能够修改 FileIO#MatchFn (defined here in the Java SDK) 以提供 PCollection<String> -> PCollection<Iterable<String>> 在 GCS glob 模式的输入和 glob 匹配的 Iterable 输出之间转换的机制。
我遇到的挑战是:如何在 DoFn 中将拆分调用分组/收集回单个输出值?我尝试过使用有状态处理并使用BagState 来收集文件内容,但我部分意识到可拆分DoFn 的ProcessElement 方法可能只接受ProcessContext 和Restriction 元组, 并且没有其他 args 因此没有 StateId args 引用 StateSpec (在运行时引发无效参数错误)。
我注意到在官方SDF proposal doc 的FilePatternWatcher 示例中创建了一个自定义跟踪器,其中FilePath 对象保存在一个集合中,并且可能通过tryClaim 添加到集合中。这似乎适用于我的用例,但我不明白/不明白如何使用自定义 RestrictionTracker 来实现 @SplitRestriction 方法。
如果有人能够提供建议,我将非常感激。我对任何特定的解决方案没有偏好,只是我想实现在单个 PCollection 元素中维护一组实体的能力,同时从 Google Cloud Storage (GCS) 中并行获取这些实体。
【问题讨论】:
标签: google-cloud-dataflow apache-beam