【发布时间】:2021-06-16 08:30:02
【问题描述】:
我在 Windows 中有以下目录/文件结构:
Learn Airflow
|
project
|_dags
|_file_mover.py
|_first_dag.py
|_dockerfiles
|_Dockerfile
|_docker-compose.yml
在file_mover.py 中,我有一个简单的脚本将一些文件从LocationA 移动到LocationB。在first_dag.py 中,我有一个触发file_mover.py 的脚本。因此,当我在终端中执行 docker-compose up --build 并检查 webserver localhost:8080 时,我确实在 Airflow 中看到了 first_dag。因此,当我打开该 DAG 时,我预计文件将从 LocationA 移动到 LocationB e.q。 file_mover.py 被触发.. 然而,这并没有发生,我不知道为什么。
这是 file_mover.py
import os
import shutil
location_a = r'c:\data\GG\Desktop\LocationA'
location_b = r'c:\data\GG\Desktop\LocationB'
files = os.listdir(location_a)
for f in files:
file_path = os.path.join(location_a, f)
shutil.move(file_path, location_b)
这是 first_dag.py
try:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import os
import sys
print('All dag modules are ok.....')
except Exception as e:
print('Error {}'.format(e))
def first_function_execute():
os.system('python c:\data\GG\Desktop\Python Microsoft Visual Studio\Learn Airflow\project\dags\file_mover.py')
with DAG (
dag_id = 'first_dag',
schedule_interval='@daily',
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2021, 1, 1),
},
catchup=False) as f:
first_function_execute = PythonOperator(
task_id='first_function_execute',
python_callable=first_function_execute)
我最终想要的是通过 Airflow localhost 来安排和监控 file_mover.py 应用程序,但是上面的尝试似乎不起作用......
【问题讨论】:
标签: python docker-compose airflow airflow-scheduler