【问题标题】:How to use AirFlow to run a folder of python files?如何使用 AirFlow 运行 python 文件的文件夹?
【发布时间】:2017-01-29 08:03:24
【问题描述】:

我在一个 Python 文件文件夹中有一系列 Python 任务:file1.py、file2.py、...

我阅读了 Airflow 文档,但没有看到如何在 DAG 中指定 python 文件的文件夹和文件名?

我想执行那些 python 文件(不是通过 Python Operator 的 Python 函数)。

任务1:执行file1.py(带有一些导入包)

任务2:执行file2.py(与其他一些导入包)

这会很有帮助。谢谢,问候

【问题讨论】:

    标签: python airflow


    【解决方案1】:

    您可以使用 BashOperator 将 python 文件作为任务执行

        from airflow import DAG
        from airflow.operators import BashOperator,PythonOperator
        from datetime import datetime, timedelta
    
        seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                          datetime.min.time())
    
        default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': seven_days_ago,
            'email': ['airflow@airflow.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
          )
    
        dag = DAG('simple', default_args=default_args)
    t1 = BashOperator(
        task_id='testairflow',
        bash_command='python /home/airflow/airflow/dags/scripts/file1.py',
        dag=dag)
    

    【讨论】:

    • 这里为什么要导入PythonOperator?你好像没用过。
    【解决方案2】:

    使用BashOperator(如liferacer的回答)作为一个整体执行python文件:

    from airflow.operators.bash_operator import BashOperator
    
    bash_task = BashOperator(
        task_id='bash_task',
        bash_command='python file1.py',
        dag=dag
    )
    

    然后,使用PythonOperator 调用您的main 函数来完成此操作。您应该已经有一个 __main__ 块,所以将其中发生的事情放入 main 函数中,这样您的 file1.py 看起来像这样:

    def main():
        """This gets executed if `python file1` gets called."""
        # my code
    
    if __name__ == '__main__':
        main() 
    

    然后你的 dag 定义:

    from airflow.operators.python_operator import PythonOperator
    
    import file1
    
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=file1.main,
        dag=dag
    )
    

    【讨论】:

    • bash_task = PythonOperator… bash_operator import PythonOperator 非常混乱,很可能是一个错误。
    • 你的第一段代码不应该使用BashOperator而不是PythonOperator吗?
    • 谢谢。调用 module.main 是必要的,尤其是当您在 ipython 上调用远程函数时。否则你会得到像异常_前缀_dac28c52b432fb881bf6fa1c4f25b4960b7ffe5a_ipython_dag这样的错误。
    【解决方案3】:

    我知道您问的是“想要执行那些 Python 文件(而不是通过 Python 运算符执行的 Python 函数)”。但我认为这可能比您使用 Airflow 的效率低。我在之前写的答案中也看到了混乱,所以这是您想要的方式,也是我建议的完成任务的方式:

    假设:

    dags/
        my_dag_for_task_1_and_2.py
        tasks/
             file1.py
             file2.py
    

    您要求避免PythonOperator

    #  my_dag_for_task_1_and_2.py
    import datetime as dt
    from airflow import DAG
    from airflow.operators import BashOperator
    
    with DAG(
        'my_dag_for_task_1_and_2',
        default_args={
            'owner': 'me',
            'start_date': datetime(…),
            …,
        }, 
        schedule_interval='8 * * * *',
    ) as dag:
        task_1 = BashOperator(
            task_id='task_1', 
            bash_command='/path/to/python /path/to/dags/tasks/file1.py',
        )
        task_2 = BashOperator(
            task_id='task_2', 
            bash_command='/path/to/python /path/to/dags/tasks/file2.py',
        )
        task_1 >> task_2
    

    您不是从头开始为 Airflow 编写 Python,而是使用 PythonOperator

    #  my_dag_for_task_1_and_2.py
    import datetime as dt
    from airflow import DAG
    from airflow.operators import PythonOperator
    import tasks.file1
    import tasks.file2
    
    with DAG(
        'my_dag_for_task_1_and_2',
        default_args={
            'owner': 'me',
            'start_date': datetime(…),
            …,
        }, 
        schedule_interval='8 * * * *',
    ) as dag:
        task_1 = PythonOperator(
            task_id='task_1', 
            python_callable=file1.function_in_file1,
        )
        task_2 = PythonOperator(
            task_id='task_2', 
            python_callable=file2.function_in_file2,  # maybe main?
        )
        task_1 >> task_2
    

    【讨论】:

    • 我喜欢你使用with DAG(...) as dag: ...。改善上下文。
    • @Wordsmyth 社区正在努力更新所有示例,以显示在 DAG 和任务依赖项中指定任务的所有不同方式的混合。所以这有望很快成为一个不那么“隐藏”的功能。
    • @AshishKumar 您可能需要在脚本文件夹中放置一个__init__.py 文件。它可以是空的。参考docs.python.org/3/tutorial/modules.html
    • @dlamblin 按照您的回答,我得到错误任务模块未找到。如上所述,我包含了一个空的 init.py 文件。有什么想法吗?
    • @JavierLópezTomás 它对目录和文件布局很敏感;这里带有__init__.pytasks 目录位于DAGs 文件夹的顶层。 Airflow 将该文件夹添加到 PYTHONPATH 如果您将其设为子文件夹,则您需要将模块路径一直包含到文件中,例如 subfolder.tasks.file1 这意味着每个父文件夹中还有另一个 __init__.py(可能还有 DAG文件夹本身)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-05-12
    • 2020-03-09
    • 2016-02-01
    • 1970-01-01
    • 2022-01-23
    • 2014-07-17
    • 2016-11-18
    相关资源
    最近更新 更多