【问题标题】:Create dynamic workflows in Airflow with XCOM value在 Airflow 中创建具有 XCOM 价值的动态工作流
【发布时间】:2021-03-26 16:40:06
【问题描述】:

现在,我使用这样的变量创建多个任务,它工作正常。

with DAG(....) as dag:
    body = Variable.get("config_table", deserialize_json=True)
    for i in range(len(body.keys())):
        simple_task = Operator(
            task_id = 'task_' + str(i),
            .....

但由于某种原因我需要使用 XCOM 值而不是使用变量。 是否可以动态创建具有 XCOM 拉值的任务?

我尝试这样设置值,但它不起作用

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

【问题讨论】:

    标签: airflow airflow-2.x


    【解决方案1】:

    可以从先前任务生成的XComs 动态创建任务,对此主题有更广泛的讨论,例如在此question 中。建议的方法之一遵循这种结构,这是我制作的一个工作示例:

    sample_file.json:

    {
        "cities": [ "London", "Paris", "BA", "NY" ]
    }
    
    • 从 API 或文件或任何来源获取数据。推送为XCom
    
    def _process_obtained_data(ti):
        list_of_cities = ti.xcom_pull(task_ids='get_data')
        Variable.set(key='list_of_cities',
                     value=list_of_cities['cities'], serialize_json=True)
    
    def _read_file():
        with open('dags/sample_file.json') as f:
            data = json.load(f)
            # push to XCom using return
            return data
    
    
    with DAG('dynamic_tasks_example', schedule_interval='@once',
             start_date=days_ago(2),
             catchup=False) as dag:
    
        get_data = PythonOperator(
            task_id='get_data',
            python_callable=_read_file)
    
    • 添加第二个任务,该任务将从 XCom 拉取并设置一个 Variable,其中包含您稍后将用于迭代的数据。
        preparation_task = PythonOperator(
            task_id='preparation_task',
            python_callable=_process_obtained_data)
    

    *当然,如果您愿意,您可以将两个任务合并为一个。我不想这样做,因为通常我会使用提取数据的一个子集来创建Variable

    • Variable 中读取,然后对其进行迭代。定义default_var关键
        end = DummyOperator(
            task_id='end',
            trigger_rule='none_failed')
    
        # Top-level code within DAG block
        iterable_list = Variable.get('list_of_cities',
                                     default_var=['default_city'],
                                     deserialize_json=True)
    
    • 在循环中声明动态任务及其依赖关系。使 task_id 唯一。 TaskGroup 是可选的,可帮助您对 UI 进行排序。
    
        with TaskGroup('dynamic_tasks_group',
                       prefix_group_id=False,
                       ) as dynamic_tasks_group:
            if iterable_list:
                for index, city in enumerate(iterable_list):
                    say_hello = PythonOperator(
                        task_id=f'say_hello_from_{city}',
                        python_callable=_print_greeting,
                        op_kwargs={'city_name': city, 'greeting': 'Hello'}
                    )
                    say_goodbye = PythonOperator(
                        task_id=f'say_goodbye_from_{city}',
                        python_callable=_print_greeting,
                        op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
                    )
    
                    # TaskGroup level dependencies
                    say_hello >> say_goodbye
    
    # DAG level dependencies
    get_data >> preparation_task >> dynamic_tasks_group >> end
    
    

    DAG 图视图:

    进口:

    import json
    from airflow import DAG
    from airflow.utils.dates import days_ago
    from airflow.models import Variable
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.dummy import DummyOperator
    from airflow.utils.task_group import TaskGroup
    

    注意事项:

    • 如果您同时有同一个 DAG 的 dag_runs,它们都将使用相同的变量,因此您可能需要通过区分它们的名称来使其“唯一”。
    • 读取Variable时必须设置默认值,否则第一次执行可能无法处理到Scheduler
    • Airflow 图表视图 UI 可能不会立即刷新更改。尤其是在从创建动态任务生成的可迭代对象中添加或删除项目后的第一次运行中。
    • 如果您需要读取多个变量,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断创建与元数据数据库的连接(此 article 中的示例)。

    祝你好运!

    编辑:

    另一个需要考虑的重点:

    • 使用这种方法,对Variable.get() 方法的调用是top-level code,因此调度程序每30 秒读取一次(默认为min_file_process_interval 设置)。这意味着每次都会连接到元数据数据库。

    编辑:

    • 添加了 if 子句来处理 emtpy iterable_list 情况。

    【讨论】:

    • 很好,应该注意 TaskGroup 仅是 2.0+ 功能
    • 感谢您的建议。很有帮助!!
    【解决方案2】:

    这是不可能的,一般不推荐动态任务:

    1. Airflow 调度程序的工作方式是读取 dag 文件,将任务加载到内存中,然后检查需要调度哪些 dag 和哪些任务,而 xcom 是与特定 dag 运行相关的运行时值,因此调度程序无法中继 xcom 值。
    2. 使用动态任务时,您自己的调试工作会变得更加困难,因为您用于创建 dag 的值可能会发生变化,您甚至会在不了解原因的情况下无法访问日志。

    您可以做的是使用分支运算符,始终执行这些任务并根据 xcom 值跳过它们。 例如:

    def branch_func(**context)
        return f"task_{context['ti'].xcom_pull(key=key)}"
    
    
    branch = BranchPythonOperator(
        task_id="branch",
        python_callback=branch_func
    )
    
    tasks = [BaseOperator(task_id=f"task_{i}") for i in range(3)]
    branch >> tasks
    

    在某些情况下,使用这种方法也不好(例如,当我有 100 个可能的任务时),在这些情况下,我建议您编写自己的运算符或使用单个 PythonOperator。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-05-21
      • 2023-02-03
      • 1970-01-01
      • 2019-01-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多