【发布时间】:2019-03-30 11:24:13
【问题描述】:
我已经在 DAG(Bash 和 Docker 运算符)中成功创建了动态任务,但我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。
for i in range(0, max_tasks):
task_scp_queue = BashOperator(task_id="scp_queue_task_{}".format(i), bash_command="""python foo""", retries=3, dag=dag, pool="scp_queue_pool", queue="foo", provide_context=True, xcom_push=True) # Pull the manifest ID from the previous task via xcom'
task_process_queue = DockerOperator(task_id="process_task_{}".format(i), command="""python foo --queue-name={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), retries=3, dag=dag, pool="process_pool", api_version="auto", image="foo", queue="foo", execution_timeout=timedelta(minutes=5))
task_manifest = DockerOperator(api_version="auto", task_id="manifest_task_{}".format(i), image="foo", retries=3, dag=dag, command=""" python --manifestid={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), pool="manfiest_pool", queue="d_parser")
task_psql_queue.set_downstream(task_scp_queue)
task_process_queue.set_upstream(task_scp_queue)
task_manifest.set_upstream(task_process_queue)
如您所见,我尝试在 Jinja 模板中使用 Python 格式字符串来传递其中的 i 变量,但这不起作用。
我也尝试过使用“task.task_id”,并仅使用 task_id 创建一个新字符串,但这也不起作用。
编辑:
现在命令看起来像这样
command="""python foo \
--queue-name="{{
task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}"
""".format(i)
我的 Airflow 调试日志看起来像
Using Master Queue: process_{
task_instance.xcom_pull(task_ids='scp_queue_task_31') }
所以字符串值正在被填充,但它没有执行 xcom_pull。
【问题讨论】: