【问题标题】:Airflow Macros In Python OperatorPython运算符中的气流宏
【发布时间】:2018-06-05 20:12:20
【问题描述】:

我正在尝试在我的 Python Operator 中使用 Airflow 宏,但我不断收到“airflow: error: unrecognized arguments:”

所以我正在导入一个具有 3 个位置参数的函数:(sys.argv,start_date,end_date),我希望将 start_date end_date Airflow 中的执行日期。

函数参数看起来像这样

def main(argv,start_date,end_date):

这是我在 DAG 中的任务:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)

【问题讨论】:

    标签: airflow


    【解决方案1】:

    由于您要传入需要由 Airflow 呈现的日期,因此您需要在 Python 运算符中使用 templates_dict 参数。此字段是 Airflow 将识别为包含模板的唯一字段。

    您可以创建一个自定义 Python 运算符,通过复制现有运算符并将相关字段添加到 template_fields 元组,将更多字段识别为模板。

    def main(**kwargs):
        argv = kwargs.get('templates_dict').get('argv')
        start_date = kwargs.get('templates_dict').get('start_date')
        end_date = kwargs.get('templates_dict').get('end_date')
    
    
    t1 = PythonOperator(task_id='Pull_DCM_Report',
                        provide_context=True,
                        python_callable=main,
                        templates_dict={'argv': sys.argv,
                                        'start_date': '{{ yesterday_ds }}',
                                        'end_date': '{{ ds }}'},
                        dag=dag)
    

    【讨论】:

      【解决方案2】:

      您可以使用以下内容“包装”对main 函数的调用:

      t1 = PythonOperator(
          task_id='Pull_DCM_Report',
          provide_context=True,
          python_callable=lambda **context: main([], context["ds"], context["ds"]),
          dag=dag)
      

      如果 lambdas 不是您的菜,您可以定义一个函数,调用它,然后将其调用到 main

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-07-10
        • 2019-05-13
        • 2019-07-09
        • 1970-01-01
        • 2023-03-21
        • 2021-07-02
        • 2022-01-19
        • 1970-01-01
        相关资源
        最近更新 更多