【问题标题】:Airflow - creating dynamic Tasks from XCOMAirflow - 从 XCOM 创建动态任务
【发布时间】:2019-06-12 16:16:26
【问题描述】:

我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我存储了一个列表,我想使用列表中的每个元素来动态创建下游任务。

我的用例是我有一个上游操作员,它检查 sftp 服务器的文件并返回与特定条件匹配的文件名列表。我想为返回的每个文件名创建动态下游任务。

我已将其简化为以下内容,虽然它有效,但我觉得它不是一种惯用的气流解决方案。在我的用例中,我将编写一个从 python 运算符调用的 python 函数,该函数从 xcom 中提取值并返回它,而不是使用 pusher 函数。

我知道,虽然我可以创建一个结合了两者的自定义运算符,但我不认为创建一次性运算符是一种好习惯,我希望有另一种解决方案。

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["test@mctest.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic'+task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    我不会做你想要达到的目标,主要是因为:

    1. XCOM 值是在运行时中生成的状态
    2. DAG 结构是在解析时间中确定的

    即使您使用类似以下的方法来访问由某些上游任务生成的 XCOM 值:

    from airflow.models import TaskInstance
    from airflow.utils.db import provide_session
    
    dag = DAG(...)
    
    @provide_session
    def get_files_list(session):
        execution_date = dag.previous_schedule(datetime.now())
    
        // Find previous task instance:
        ti = session.query(TaskInstance).filter(
            TaskInstance.dag_id == dag.dag_id,
            TaskInstance.execution_date == execution_date,
            TaskInstance.task_id == upstream_task_id).first()
        if ti:
            files_list = ti.xcom_pull()
            if files_list:
                return files_list
        // Return default state:
        return {...}
    
    
    files_list = get_files_list()
    // Generate tasks based on upstream task state:
    task = PythonOperator(
        ...
        xcom_push=True,
        dag=dag)
    

    但这会表现得很奇怪,因为 DAG 解析和任务执行不会以您希望的方式同步。

    如果您想要这样做的主要原因是并行文件处理,我会有一些静态数量的处理任务(由所需的并行度决定)从上游任务的 XCOM 值读取文件列表并在相关部分进行操作那个列表。

    另一种选择是使用一些分布式计算框架(如 Apache Spark)并行化文件处理。

    【讨论】:

    • 这样做的主要原因是 1. 并行性,2. 我无法控制从上游任务接收到的列表中有多少文件/或数据,但我想确保所有文件都通过下游任务
    • 使用您提到的第一种方法,假设有 5 个文件和两个处理任务。在处理任务处理完列表中的 file1 和 file2 后,如何再次为 file3 和 file4 触发相同的任务?最后我怎样才能单独为file5触发1个处理任务?这个方向的任何例子都有帮助
    • @nightgaunt 每个处理任务使用files[int(task_idx * len(files) / parallelism):int((task_idx + 1) * len(files) / parallelism)] 获取files 数组的一部分,这样第一个任务获取文件1-2,而第二个任务获取文件3 到5。
    • 但是单个任务需要通过在单个运算符中迭代来处理多个文件。正确的?这需要一个自定义运算符来执行两次/三次相同的操作。不要采取坏的方式。你的解决方案是有道理的,也是我最接近答案的。但是,如果有一种方法可以让任务只对一个文件进行操作,我愿意选择该选项。
    • 一种使用 subdags 和动态生成任务的解决方案在我脑海中酝酿了一段时间。但是你关于 xcom 是运行时间而结构是解析时间的解释让我犹豫了。
    【解决方案2】:

    我能想到的最简单的方法是使用分支运算符。 https://github.com/apache/airflow/blob/master/airflow/example_dags/example_branch_operator.py

    【讨论】:

    • 你能详细说明一下吗? BranchOperator 如何帮助访问 xcom 变量并在其上设置动态下游任务?
    • 您的示例展示了如何以编程方式创建分支 - 问题是,如何动态创建任务,具体取决于前一个任务在运行时获取的结果我>
    猜你喜欢
    • 2019-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-18
    • 2021-08-28
    • 2018-07-27
    • 1970-01-01
    相关资源
    最近更新 更多