【发布时间】:2019-06-12 16:16:26
【问题描述】:
我正在尝试从 XCOM 变量生成一组动态任务。在 XCOM 中,我存储了一个列表,我想使用列表中的每个元素来动态创建下游任务。
我的用例是我有一个上游操作员,它检查 sftp 服务器的文件并返回与特定条件匹配的文件名列表。我想为返回的每个文件名创建动态下游任务。
我已将其简化为以下内容,虽然它有效,但我觉得它不是一种惯用的气流解决方案。在我的用例中,我将编写一个从 python 运算符调用的 python 函数,该函数从 xcom 中提取值并返回它,而不是使用 pusher 函数。
我知道,虽然我可以创建一个结合了两者的自定义运算符,但我不认为创建一次性运算符是一种好习惯,我希望有另一种解决方案。
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
"owner": "test",
"depends_on_past": False,
"start_date": datetime(2018, 10, 27),
"email": ["test@mctest.com"],
"email_on_failure": False,
"email_on_retry": False,
"email_on_success": False,
"retries": 0,
"provide_context": True
}
dag = DAG("test", default_args=default_args, schedule_interval="@daily", catchup=False)
def pusher(**context):
return ['a', 'b', 'c', 'd', 'e']
pusher_task = PythonOperator(
task_id='pusher_task',
dag=dag,
python_callable=pusher
)
def bash_wrapper(task, **context):
return BashOperator(
task_id='dynamic'+task,
dag=dag,
bash_command='date'
)
end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')
pusher_task >> [bash_wrapper(task) for task in pusher()] >> end
【问题讨论】: