【发布时间】:2016-02-04 06:27:49
【问题描述】:
我正在尝试使用 Airflow 来执行一个简单的任务 python。
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
如果我尝试,例如:
气流测试 python_test print 2015-01-01
有效!
现在我想将我的 def print_context(ds, **kwargs) 函数放在其他 python 文件中。所以我创建了另一个名为:simple_test.py 的文件并更改:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
现在我尝试再次运行:
气流测试 python_test print 2015-01-01
好吧!它仍然有效!
但如果我创建一个模块,例如,带有文件SimplePython.py 的工作模块,导入 (from worker import SimplePython) 并尝试:
气流测试 python_test print 2015-01-01
它给出了信息:
ImportError: 没有名为 worker 的模块
问题:
- 是否可以在 DAG 定义中导入模块?
- Airflow+Celery 将如何在工作节点之间分发所有必要的 python 源文件?
【问题讨论】:
标签: python celery celery-task airflow