【问题标题】:How to trigger a task externally in a dag cloud composer如何在 dag 云作曲家中从外部触发任务
【发布时间】:2021-03-23 12:18:04
【问题描述】:

我想要一个基本上看起来像这样的数据管道

其中多个任务由相应的 pubsub 消息触发,处理来自 pubsub 消息输入的数据,最后一个任务只有在所有这些工作流完成后才会触发。我设法使用 PubSub 触发了整个 DAG(在此 guide 之后对 PubSub 进行了修改),但它触发了整个 DAG,而不是单个任务。有没有办法只在外部触发 DAG 中的 1 个任务(来自 Cloud Function/PubSub?)

编辑

这是我认为的 DAG 代码的简化版本:

import google.cloud.bigquery as bigquery

import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators import python_operator
from airflow.operators import dummy_operator


def task1_1(**kwargs):
    # I want this function to take the table name of source 1 from pubsub1, reads the table from BigQuery and processes it
    client_bq = bigquery.Client()
    table_name = kwargs['dag_run'].conf.get('message')
    data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
    # ETL Code
    # ..... 


def task2_1(**kwargs):
    # I want this function to take the table name of source 2 from pubsub2, reads the table from BigQuery and processes it
    client_bq = bigquery.Client()
    table_name = kwargs['dag_run'].conf.get('message')
    data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
    # ETL Code
    # ..... 

def task_combine():
    # This task is triggered when task1_1 and task2_1 are done
    # More ETL code


with DAG(
        'clean_am_workflow',
        schedule_interval=None,
        start_date=datetime.datetime.today() - datetime.timedelta(days=5),
        catchup=False) as dag:

    source_1 = python_operator.PythonOperator(
        task_id='process_source_1',
        python_callable=task1_1,
        provide_context=True
        )

    source_2 = python_operator.PythonOperator(
        task_id='process_source_2',
        python_callable=task2_1,
        provide_context=True
        )

    combine = python_operator.PythonOperator(
        task_id='combine_sources',
        python_callable=task_combine,
        provide_context=True
        )

    [source_1, source_2] >> combine

【问题讨论】:

  • 您好,请修复您问题中的链接,并分享您的 dag。
  • @PeterRing 我修复了它并添加了代码,但我不知道它是否有很大帮助。这就是我希望 DAG 的样子。是否也需要查看触发 DAG 的 Cloud Function 函数?

标签: google-cloud-functions airflow google-cloud-pubsub google-cloud-composer


【解决方案1】:

您需要的不是触发dag本身,而是根据bigquery分别触发不同的任务。这可以通过气流传感器来实现。 https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html SQL 传感器: https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/sensors/sql.html

在这种情况下,dag 将由正常的 cron 触发。 2 传感器任务将定期查询 bigquery,如果该查询返回“good to go”,那么它将启动任务。因为 2 传感器是独立的,所以最后一个任务只有在传感器和任务都完成时才会执行。

【讨论】:

  • 感谢您的回答!气流传感器可以感应到 bigquery 数据集中新创建的表吗?
猜你喜欢
  • 1970-01-01
  • 2019-12-23
  • 1970-01-01
  • 2019-01-29
  • 1970-01-01
  • 2020-06-07
  • 1970-01-01
  • 2020-04-19
  • 2021-11-06
相关资源
最近更新 更多