【发布时间】:2018-06-10 13:49:47
【问题描述】:
我有一个可以从 CronJob 运行的 Python 脚本。我想使用PythonOperator(s) 将其转换为 DAG,因为我们现在正在转换为 Airflow。
说我有函数:a(),b(),c(),d()
他们的执行顺序是:a->b->c->d
假设功能代码是:
def a():
print("Happy")
def b():
print("Birthday")
def c():
print("to")
def d():
print("you!")
** 这只是一个示例,我的所有函数的代码都比较复杂
我有这个 DAG:
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'schedule_interval': '0 10 * * *'
}
dag = DAG(dag_id='example', default_args=args)
a = PythonOperator(task_id='a', dag=dag)
b = PythonOperator(task_id='b', dag=dag)
c = PythonOperator(task_id='c', dag=dag)
d = PythonOperator(task_id='d', dag=dag)
a.set_downstream(b)
b.set_downstream(c)
c.set_downstream(d)
我不明白的是我把a(),b(),c(),d()的代码放在哪里,在PythonOperator的执行中我在哪里指定他们的名字。
您可以说我正在寻找一种将我的 Python 脚本转换为 Airflow 的方法,因为每个函数都是一个单独的运算符。
我认为这应该非常简单和基本,但我没有找到有关如何执行此操作的任何信息。
【问题讨论】:
-
请注意,任务 a、b、c 和 d 可以在不同的工作人员上运行。