【问题标题】:Create dataflow job dynamically动态创建数据流作业
【发布时间】:2023-03-20 19:42:01
【问题描述】:

我是 Google Cloud 和 Dataflow 的新手。我想做的是创建一个工具来检测和恢复(大)csv文件中的错误。 但是在设计时,并不是每个应该处理的错误都是已知的。因此,我需要一种简单的方法来添加处理特定错误的新函数。

该工具应该类似于基于用户选择自动创建数据流模板的框架。我已经想到了一个可行的工作流程,但如前所述,我对此完全陌生,因此请随时提出更好的解决方案:

  1. 用户选择应该在前端使用哪些纠错方法
  2. 创建一个指定所选转换的 yaml 文件
  3. python 脚本解析 yaml 文件并使用错误处理函数来构建数据流作业,该作业执行 yaml 文件中指定的这些函数
  4. 数据流作业存储为模板,并通过 REST API 调用为存储在 GCP 上的文件运行

为了实现可扩展性,应该很容易地添加实现纠错的新功能。我想到的是:

  1. 开发者编写所需函数并上传到指定文件夹
  2. 新功能手动添加到前端/或数据库等,可以选择检查/处理错误
  3. 用户现在可以选择新添加的错误处理功能,并且正在创建的数据流模板使用此功能无需编辑构建数据流模板的代码

但是我的问题是我不确定这是否可行或是否是解决此问题的“好”解决方案。此外,我不知道如何创建一个使用在设计时未知的函数的 python 脚本。 (我想过使用类似策略模式的东西,但据我所知,您仍然需要在设计时实现功能,即使在运行时决定使用哪个功能) 任何帮助将不胜感激!

【问题讨论】:

  • 您好!您能否指定要在 CSV 文件中检测的错误类型?
  • 您好 Ines,我想检测和处理以下错误(该列表并非详尽无遗): |非转义分隔符 |文本列中的换行符(这将导致一个数据条目有两行)|错误的数据类型 [例如日期列中的字符串] |多个标题行 |重复的行...

标签: python google-cloud-platform google-cloud-dataflow apache-beam


【解决方案1】:

您可以在架构中使用 Cloud Functions 和 Cloud Composer(Airflow 的托管解决方案)。 Apache Airflow 旨在定期运行 DAG,但您也可以触发 DAG 以响应事件,例如 Cloud Storage 存储桶(可以存储 CSV 文件的位置)中的更改。您可以使用前端进行配置,每次新文件到达 Bucket 时,都会触发包含逐步过程的 DAG。

请查看official documentation,它描述了使用DataflowTemplateOperator 使用 Cloud Composer 启动 Dataflow 管道的过程。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-10-13
    • 2021-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多