我知道您问的是“想要执行那些 Python 文件(而不是通过 Python 运算符执行的 Python 函数)”。但我认为这可能比您使用 Airflow 的效率低。我在之前写的答案中也看到了混乱,所以这是您想要的方式,也是我建议的完成任务的方式:
假设:
dags/
my_dag_for_task_1_and_2.py
tasks/
file1.py
file2.py
您要求避免PythonOperator:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import BashOperator
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='/path/to/python /path/to/dags/tasks/file1.py',
)
task_2 = BashOperator(
task_id='task_2',
bash_command='/path/to/python /path/to/dags/tasks/file2.py',
)
task_1 >> task_2
您不是从头开始为 Airflow 编写 Python,而是使用 PythonOperator:
# my_dag_for_task_1_and_2.py
import datetime as dt
from airflow import DAG
from airflow.operators import PythonOperator
import tasks.file1
import tasks.file2
with DAG(
'my_dag_for_task_1_and_2',
default_args={
'owner': 'me',
'start_date': datetime(…),
…,
},
schedule_interval='8 * * * *',
) as dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=file1.function_in_file1,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=file2.function_in_file2, # maybe main?
)
task_1 >> task_2