【问题标题】:Apache Airflow Xcom Pull from dynamic task nameApache Airflow Xcom 从动态任务名称中提取
【发布时间】:2019-03-30 11:24:13
【问题描述】:

我已经在 DAG(Bash 和 Docker 运算符)中成功创建了动态任务,但我很难将这些动态创建的任务传递给 xcom_pull 以获取数据。

for i in range(0, max_tasks):
    task_scp_queue = BashOperator(task_id="scp_queue_task_{}".format(i), bash_command="""python foo""", retries=3, dag=dag, pool="scp_queue_pool", queue="foo", provide_context=True, xcom_push=True) # Pull the manifest ID from the previous task via xcom'

    task_process_queue = DockerOperator(task_id="process_task_{}".format(i), command="""python foo --queue-name={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), retries=3, dag=dag, pool="process_pool", api_version="auto", image="foo", queue="foo", execution_timeout=timedelta(minutes=5))
    task_manifest = DockerOperator(api_version="auto", task_id="manifest_task_{}".format(i), image="foo", retries=3, dag=dag, command=""" python --manifestid={{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}""".format(i), pool="manfiest_pool", queue="d_parser")

    task_psql_queue.set_downstream(task_scp_queue)
    task_process_queue.set_upstream(task_scp_queue)
    task_manifest.set_upstream(task_process_queue)

如您所见,我尝试在 Jinja 模板中使用 Python 格式字符串来传递其中的 i 变量,但这不起作用。

我也尝试过使用“task.task_id”,并仅使用 task_id 创建一个新字符串,但这也不起作用。

编辑:

现在命令看起来像这样

command="""python foo \ 
    --queue-name="{{ 
    task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}" 
     """.format(i)

我的 Airflow 调试日志看起来像

Using Master Queue: process_{ 
task_instance.xcom_pull(task_ids='scp_queue_task_31') }

所以字符串值正在被填充,但它没有执行 xcom_pull。

【问题讨论】:

    标签: python jinja2 airflow


    【解决方案1】:

    我很困惑这怎么行不通。记录您遇到的错误会很有帮助。

    简而言之,你所做的看起来不错,如果max_tasks=2 你会得到:

    task_psql_queue.taskid --> scp_queue_task_0 >> process_task_0 >> manifest_task_0
                           \-> scp_queue_task_1 >> process_task_1 >> manifest_task_1
    

    我怀疑你不需要超时,这真的很短。因为你有很长的行并且随机重新排列你的命名参数,所以我将重新格式化你写的内容:

    for i in range(0, max_tasks):
        task_scp_queue = BashOperator(
            task_id="scp_queue_task_{}".format(i),
            dag=dag,
            retries=3,  # you could make it a default arg on the dag
            pool="scp_queue_pool",
            queue="foo", # you really want both queue and pool? When debugging remove them.
            bash_command="python foo",  # Maybe you snipped a multiline command
            provide_context=True,  # BashOp doesn't have this argument
            xcom_push=True,  # PUSH the manifest ID FOR the NEXT task via xcom
        )
    
        task_process_queue = DockerOperator(
            task_id="process_task_{}".format(i),
            dag=dag,
            retries=3,
            pool="process_pool",
            queue="foo",
            execution_timeout=timedelta(minutes=5),
            api_version="auto",
            image="foo",
            command="python foo --queue-name="
                    "{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
        )
    
        task_manifest = DockerOperator(
            task_id="manifest_task_{}".format(i),
            retries=3,
            dag=dag,
            pool="manfiest_pool",
            queue="d_parser",
            api_version="auto",
            image="foo",
            command="python --manifestid="
                    "{{{{ task_instance.xcom_pull(task_ids=scp_queue_task_{}) }}}}".format(i),
        )
    
        task_psql_queue >> task_scp_queue >> task_process_queue >> task_manifest
    

    哦,现在看,您没有将task_ids 作为字符串传递。试试:

            command="python foo --queue-name="
                    "{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),
    … … …
            command="python --manifestid="
                    "{{{{ task_instance.xcom_pull(task_ids='scp_queue_task_{}') }}}}".format(i),
    

    【讨论】:

    • 感谢您的帮助!我在上面对我的尝试和结果进行了编辑。
    • @gleb1783 抱歉,我之前的回答忘记了"{{ blah_{} }}".format(1) 产生"{ blah_1 }"。我需要将那些双花括号增加四倍(EG "{{{{ blah_{} }}}}".format(1) 变为 "{{ blah_1 }}"
    • 这种格式也适用于 bash 命令运算符。我自己也遇到过那个用例。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-13
    • 1970-01-01
    • 1970-01-01
    • 2018-07-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多