【问题标题】:Dynamic task creation using decorators in Airflow 2在 Airflow 2 中使用装饰器创建动态任务
【发布时间】:2022-01-19 12:13:11
【问题描述】:

我想根据另一个任务的结果在流程中动态创建任务。是否可以通过以下方式做到这一点?

我可以通过将任务中的数据存储在一个临时文件中并在下一个任务中读取它来做到这一点 - 但我想只使用气流资源来解决这个问题。

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago


@task
def extract() -> dict:
   return {"1001": 301.27, "1002": 433.21, "1003": 502.22}


def load_inner(dict_value_fun):
   print("Value is: {}".format(str(dict_value_fun)))


default_args = {
    'start_date': days_ago(2),
    'email_on_failure': False,
    'depends_on_past': False,
    'owner': 'airflow',
    'retries': 1,
}


@dag(default_args=default_args, schedule_interval=None,)
def tutorial_taskflow_api_etl():
   order_data = extract()

   for key in order_data:
      @task(task_id='task_key_is_{}'.format(key))
      def load(dict_value: dict, key: str):
         load_inner(dict_value[key])

      order_data >> load(order_data, key)

tutorial_etl_dag = tutorial_taskflow_api_etl()

错误信息:

The key (dict_key_is_{{ task_instance.xcom_pull(task_ids='extract', dag_id='tutorial_taskflow_api_etl', key='0') }}) has to be made of alphanumeric characters, dashes, dots and underscores exclusively

【问题讨论】:

    标签: python-decorators airflow-2.x


    【解决方案1】:

    我发现使用气流资源创建动态任务是不可能的,因为 DAG 不是在一个系统进程中运行的常规 Python 文件。 Airflow 环境通常与几个工作人员一起工作,因此可能会在不同的服务器上独立执行任务。为了能够动态创建任务,我们必须使用 GCS、数据库或 Airflow 变量等外部资源。

    我必须使用气流变量来解决我的问题: 您可以在此处查看代码:

    from airflow.models import Variable
    from airflow.decorators import dag, task
    from airflow.utils.dates import days_ago
    import logging
    import json
    
    
    @task
    def extract() -> dict:
       return {"1001": 301.27, "1002": 433.21, "1003": 502.22}
    
    @task
    def set_variables(val: dict):
        # before running dag we have to set a variable
        variable_val = Variable.get("dynamic_dictionary_variable_value")
        logging.info("current variable value (before change) is " + str(variable_val))
    
        logging.info("updated value " + str(val))
    
        # we have to change dict type into string otherwise we get error about wrong quotation
        Variable.set("dynamic_dictionary_variable_value", json.dumps(val))
    
        variable_val = Variable.get("dynamic_dictionary_variable_value")
        logging.info("current variable value (after change) is " + str(variable_val))
    
    
    default_args = {
        'start_date': days_ago(2),
        'email_on_failure': False,
        'depends_on_past': False,
        'owner': 'airflow',
        'retries': 1,
    }
    
    @dag(default_args=default_args,schedule_interval=None, start_date=days_ago(2))
    def dynamic_workflow():
        current_directory = Variable.get("dynamic_dictionary_variable_value")
        logging.info("(before) current_directory outside task is " + str(current_directory))
    
        values_directory = extract()
        return_val = set_variables(values_directory)
    
        current_directory = Variable.get("dynamic_dictionary_variable_value")
        logging.info("(after) current_directory outside task is " + str(current_directory))
    
        for key in json.loads(current_directory):
            @task(task_id='dynamic_task_{}'.format(key))
            def load(key_name: str, val: float):
                logging.info("key_name: {}, value: {}".format(key_name, val))
    
            return_val >> load(key, json.loads(current_directory)[key])
    
    dynamic_workflow_dag = dynamic_workflow()
    

    在运行这个 dag 之前,我们必须记住设置 Airflow Variable dynamic_dictionary_variable_value = {}。在我的示例中,我使用 {"50": 50.505}

    开头的 DAG 如下所示:

    执行后的样子:

    【讨论】:

    • 我们必须记住,这肯定不是生产解决方案,特别是如果我们希望在一组动态任务之后再执行另一个任务。如果我们增加动态任务的数量,当下一个任务开始执行其作业时,它们将不会被处理到最后——它不会等待父任务的成功,因为不知道它们——它会在气流刷新后学习,但它失败。
    猜你喜欢
    • 1970-01-01
    • 2011-12-25
    • 1970-01-01
    • 2017-06-19
    • 2023-02-03
    • 1970-01-01
    • 1970-01-01
    • 2018-07-27
    • 1970-01-01
    相关资源
    最近更新 更多