【问题标题】:Trigger Dataflow job on file arrival in GCS using Cloud Composer使用 Cloud Composer 在文件到达 GCS 时触发 Dataflow 作业
【发布时间】:2021-10-30 09:55:25
【问题描述】:

我是 GCP 的新手,正在努力更好地了解 Google Cloud 提供的 ETL 服务。我正在尝试做一个 POC,只要文件到达 GCS 中的指定位置,数据流作业就会运行,我想使用 Cloud Composer。

如何在 DAG 中使用不同的气流操作符(一个用于在 GCS 中监视文件,另一个用于触发数据流作业)?

【问题讨论】:

    标签: python-3.x google-cloud-platform airflow google-cloud-dataflow google-cloud-composer


    【解决方案1】:
    1. 当文件创建/到达您的存储桶时触发创建云函数。您可以在以下文档中找到如何部署此类功能https://cloud.google.com/functions/docs/calling/storage#deploy
    2. 编写你的函数来触发你的 DAG,在下面的文档中有很好的解释:https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
    3. 使用以下任一气流运算符将数据流作业作为 DAG 中的任务运行
    • DataflowCreateJavaJobOperator 如果您使用的是 Java SDK
    • DataflowCreatePythonJobOperator 如果您使用的是 Python SDK

    更多细节在这里:https://airflow.apache.org/docs/apache-airflow-providers-google/2.1.0/operators/cloud/dataflow.html


    这就是你关于作曲家问题的答案,但是你应该考虑用Cloud Workflows替换作曲家,而不是使用气流数据流运算符,你可以使用Cloud Workflows dataflow connector,你可以对您的云函数进行编码以触发 Cloud Workflows 执行。有许多不同语言的客户端库可以做到这一点。这里是选择你的图书馆的链接:https://cloud.google.com/workflows/docs/reference/libraries

    与 Composer 相比,最后是 Cloud Workflows 专业人士。

    • 它是无服务器的:您不必担心机器类型、网络……

    • 您使用 yaml 轻松快速地描述您的工作流程,因此您不必担心学习气流

    • 比作曲家便宜很多

    • ...

    【讨论】:

    • 嗨 Phoenix,感谢您的回复,我肯定也会尝试一下,但主要是我要寻找的是如何配置两个不同的气流操作器,一个是文件传感器在 gcs 和其他上上传以触发 java 数据流作业,我无法理解如何将这两个运算符链接在一起,以便数据流运算符在文件传感器运算符发现新文件已上传时执行数据流作业
    • 恕我直言,我描述的解决方案将帮助您摆脱传感器。此类传感器将每 n 秒使用一次 pplling,但触发的 Cloud Function 更具反应性,并将尽快触发您的 dag,文件添加到您的存储桶中。
    • 好的,知道了。将对此进行测试
    猜你喜欢
    • 1970-01-01
    • 2019-06-06
    • 1970-01-01
    • 2020-12-10
    • 2021-08-13
    • 2021-03-14
    • 1970-01-01
    • 2021-11-01
    • 2022-07-22
    相关资源
    最近更新 更多