【问题标题】:Convert python script to Airflow PythonOperator(s)将 python 脚本转换为 Airflow PythonOperator(s)
【发布时间】:2018-06-10 13:49:47
【问题描述】:

我有一个可以从 CronJob 运行的 Python 脚本。我想使用PythonOperator(s) 将其转换为 DAG,因为我们现在正在转换为 Airflow。

说我有函数:a(),b(),c(),d() 他们的执行顺序是:a->b->c->d

假设功能代码是:

def a(): 
    print("Happy")

def b(): 
    print("Birthday")

def c(): 
    print("to")

def d(): 
    print("you!")

** 这只是一个示例,我的所有函数的代码都比较复杂

我有这个 DAG:

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

dag = DAG(dag_id='example', default_args=args)

a = PythonOperator(task_id='a', dag=dag)
b = PythonOperator(task_id='b', dag=dag)
c = PythonOperator(task_id='c', dag=dag)
d = PythonOperator(task_id='d', dag=dag)

a.set_downstream(b)
b.set_downstream(c)
c.set_downstream(d)

我不明白的是我把a(),b(),c(),d()的代码放在哪里,在PythonOperator的执行中我在哪里指定他们的名字。

您可以说我正在寻找一种将我的 Python 脚本转换为 Airflow 的方法,因为每个函数都是一个单独的运算符。

我认为这应该非常简单和基本,但我没有找到有关如何执行此操作的任何信息。

【问题讨论】:

  • 请注意,任务 a、b、c 和 d 可以在不同的工作人员上运行。

标签: python airflow


【解决方案1】:

在python算子中,将应该执行的python函数传递给算子。所以你会想像这样传递python_callable kwarg:

def do_a():
    print('running a')

a = PythonOperator(task_id='a', python_callable=do_a, dag=dag)

操作员的来源通常会记录他们的参数。 Python operator docs

【讨论】:

    猜你喜欢
    • 2021-04-15
    • 2011-02-19
    • 2021-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-31
    • 2020-06-10
    • 1970-01-01
    相关资源
    最近更新 更多