【问题标题】:Passing a command line argument to airflow BashOperator将命令行参数传递给气流 BashOperator
【发布时间】:2017-02-03 03:54:13
【问题描述】:

有没有办法将命令行参数传递给 Airflow BashOperator。目前,我有一个 python 脚本,它接受一个日期参数并执行一些特定的活动,比如清理比给定日期更旧的特定文件夹。

在只有一项任务的简化代码中,我想做的是

from __future__ import print_function
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

default_args = {
    'owner'             : 'airflow'
    ,'depends_on_past'  : False
    ,'start_date'       : datetime(2017, 01, 18)
    ,'email'            : ['abc@xyz.com']
    ,'retries'          : 1
    ,'retry_delay'      : timedelta(minutes=5)
}

dag = DAG(
    dag_id='data_dir_cleanup'
    ,default_args=default_args
    ,schedule_interval='0 13 * * *'
    ,dagrun_timeout=timedelta(minutes=10)
    )

cleanup_task = BashOperator(
        task_id='task_1_data_file_cleanup'
        ,bash_command='python cleanup.py --date $DATE 2>&1 >>  /tmp/airflow/data_dir_cleanup.log'
        #--------------------------------------^^^^^^-- (DATE variable which would have been given on command line)
        #,env=env
        ,dag=dag
    )

提前致谢,

【问题讨论】:

    标签: python bash workflow airflow


    【解决方案1】:

    BashOperator 使用 Jinja2 进行模板化,这意味着您可以传递任意值。在你的情况下,它会是这样的:

    cleanup_task = BashOperator(
            task_id='task_1_data_file_cleanup'
            ,bash_command="python cleanup.py --date {{ params.DATE }} 2>&1 >>  /tmp/airflow/data_dir_cleanup.log"
            ,params = {'DATE' : 'this-should-be-a-date'}
            ,dag=dag
        )
    

    有关更广泛的示例,另请参阅:https://airflow.incubator.apache.org/tutorial.html#templating-with-jinja

    【讨论】:

    • 我建议使用{{ params.DATE }} 而不是{{ DATE }} 来澄清它的来源。更重要的是,我会使用小写的{{ params.date }},因为它不是一个常数。
    • @nandoquintana 编辑了代码以反映这一点,因为代码必须正常工作
    【解决方案2】:

    您可以尝试以下方法(对我有用):

    cmd_command = "python path_to_task/[task_name.py] '{{ execution_date }}' '{{ prev_execution_date }}'"
    
    t = BashOperator(
         task_id = 'some_id',
         bash_command = cmd_command,
         dag = your_dag_object_name)
    

    当我这样做时,它会渲染变量,并且效果很好。我相信它适用于所有变量(请注意,我在命令的开头添加了“python”这个词,因为我想运行一个 .py 脚本。

    我的任务是正确编写的,以便将这些变量作为命令行参数(sys.argv 属性)读取。

    【讨论】:

      【解决方案3】:

      BashOperator 是 Jinja 模板化的,因此参数可以作为字典传递。

      Airflow 将安排任务并且不会提示您输入参数,因此当您说“需要将特定日期作为命令行参数传递”时,这是不可能的。尽管 Airflow 有一个 EXECUTION DATE 的概念,它是 dag 计划运行的日期,可以使用宏 {{ ds }} 或 {{ ds_nodash }} (https://airflow.incubator.apache.org/code.html#macros)

      在 BashOperator 参数中传递
      env = {}
      env['DATE'] = '{{ ds }}'  
      cleanup_task = BashOperator(
              task_id='task_1_data_file_cleanup'
              ,bash_command='python cleanup.py --date $DATE 2>&1 >>  /tmp/airflow/data_dir_cleanup.log'
              ,params=env
              ,dag=dag
          )
      

      “DATE”参数将被传递给 bash 脚本,并且可以用作任何其他带有 $DATE 的 bash 变量

      【讨论】:

      • 我尝试了这个解决方案,但 DS 没有在那里呈现。我找不到将 ds 作为参数传递的方法!
      【解决方案4】:

      试试os.system("YOUR COMMAND HERE")

      【讨论】:

        猜你喜欢
        • 2023-03-15
        • 1970-01-01
        • 2015-06-09
        • 2017-10-22
        • 2019-10-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多