【问题标题】:How to schedule a Python script in Airflow?如何在 Airflow 中安排 Python 脚本?
【发布时间】:2021-06-16 08:30:02
【问题描述】:

我在 Windows 中有以下目录/文件结构:

Learn Airflow
        |
        project
           |_dags
               |_file_mover.py
               |_first_dag.py
            |_dockerfiles
                |_Dockerfile
            |_docker-compose.yml

file_mover.py 中,我有一个简单的脚本将一些文件从LocationA 移动到LocationB。在first_dag.py 中,我有一个触发file_mover.py 的脚本。因此,当我在终端中执行 docker-compose up --build 并检查 webserver localhost:8080 时,我确实在 Airflow 中看到了 first_dag。因此,当我打开该 DAG 时,我预计文件将从 LocationA 移动到 LocationB e.q。 file_mover.py 被触发.. 然而,这并没有发生,我不知道为什么。

这是 file_mover.py

import os
import shutil  

location_a = r'c:\data\GG\Desktop\LocationA'
location_b = r'c:\data\GG\Desktop\LocationB'

files = os.listdir(location_a)

for f in files:
    file_path = os.path.join(location_a, f)
    shutil.move(file_path, location_b)

这是 first_dag.py

    try: 
     from datetime import timedelta
     from airflow import DAG
     from airflow.operators.python_operator import PythonOperator
     from datetime import datetime
     import os
     import sys
     print('All dag modules are ok.....')

except Exception as e:
    print('Error {}'.format(e))

def first_function_execute():
     os.system('python c:\data\GG\Desktop\Python Microsoft Visual Studio\Learn Airflow\project\dags\file_mover.py')
  
with DAG (
     dag_id = 'first_dag',
     schedule_interval='@daily',
     default_args={
          'owner': 'airflow',
          'retries': 1,
          'retry_delay':  timedelta(minutes=5), 
          'start_date': datetime(2021, 1, 1),
     },
     catchup=False) as f:

     first_function_execute = PythonOperator(
          task_id='first_function_execute',
          python_callable=first_function_execute)
      

我最终想要的是通过 Airflow localhost 来安排和监控 file_mover.py 应用程序,但是上面的尝试似乎不起作用......

【问题讨论】:

    标签: python docker-compose airflow airflow-scheduler


    【解决方案1】:

    正如您已正确解决的那样,应该使用PythonOperator 来运行您的脚本。我会把它作为一个函数来导出,然后你可以简单地从一个模块中导入它,只要它可以在你的 PYTHONPATH 中访问。

    import os
    import shutil  
    
    def move(location_a, location_b):
        files = os.listdir(location_a)
        for f in files:
            file_path = os.path.join(location_a, f)
            shutil.move(file_path, location_b)
    

    然后您可以使用默认参数和调度间隔来调度 DAG:

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    
    from my_script import my_python_function
    
    default_args = {
            'owner': 'airflow',
            'depends_on_past': False,
            'start_date': datetime.today(),
            'email': ['airflow@airflow.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *")
    
    PythonOperator(dag=dag,
                   task_id='my_move_task',
                   provide_context=False,
                   python_callable=move,
                   op_args=['arguments_passed_to_callable'],
                   op_kwargs={'keyword_argument':'which will be passed to function'})
    

    然后在启动 Airflow 之前,您可以将脚本的路径添加到 PYTHONPATH,如下所示:

    export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH
    

    有关 Python 操作以及如何将参数传递给函数的更多信息:https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

    【讨论】:

    • 在我的情况下,my_scriptmy_python_function 是什么?
    • 你的脚本是file_mover.py,python函数是我在答案中安排的move函数。
    • 我收到以下错误:webserver_1 | File "/usr/local/airflow/dags/file_mover.py", line 8, in <module> webserver_1 | files = os.listdir(location_a) webserver_1 | FileNotFoundError: [Errno 2] No such file or directory: 'c:\\data\\GG\\Desktop\\LocationA' webserver_1 | [2021-06-17 07:25:59 +0000] [211] [INFO] Handling signal: ttou webserver_1 | [2021-06-17 07:25:59 +0000] [233] [INFO] Worker exiting (pid: 233)
    • 您的脚本在 c:\\data\\GG\\Desktop\\LocationA 中找不到文件。检查目录。
    • 目录没有问题。我想这可能是卷或什么的东西......
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-11
    • 2016-09-29
    • 2021-12-14
    相关资源
    最近更新 更多