【发布时间】:2022-01-19 12:13:11
【问题描述】:
我想根据另一个任务的结果在流程中动态创建任务。是否可以通过以下方式做到这一点?
我可以通过将任务中的数据存储在一个临时文件中并在下一个任务中读取它来做到这一点 - 但我想只使用气流资源来解决这个问题。
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@task
def extract() -> dict:
return {"1001": 301.27, "1002": 433.21, "1003": 502.22}
def load_inner(dict_value_fun):
print("Value is: {}".format(str(dict_value_fun)))
default_args = {
'start_date': days_ago(2),
'email_on_failure': False,
'depends_on_past': False,
'owner': 'airflow',
'retries': 1,
}
@dag(default_args=default_args, schedule_interval=None,)
def tutorial_taskflow_api_etl():
order_data = extract()
for key in order_data:
@task(task_id='task_key_is_{}'.format(key))
def load(dict_value: dict, key: str):
load_inner(dict_value[key])
order_data >> load(order_data, key)
tutorial_etl_dag = tutorial_taskflow_api_etl()
错误信息:
The key (dict_key_is_{{ task_instance.xcom_pull(task_ids='extract', dag_id='tutorial_taskflow_api_etl', key='0') }}) has to be made of alphanumeric characters, dashes, dots and underscores exclusively
【问题讨论】:
标签: python-decorators airflow-2.x