【发布时间】:2024-01-19 04:29:01
【问题描述】:
我想像这样运行气流 ->
- 我有 2 名气流工作人员 W1 和 W2。
- 在 W1 中,我安排了一个任务 (W1-1),但在 W2 中,我想创建 X 个任务(W2-1、W2-2 ... W2-X)。
- 每个任务的数字 X 和 bash 命令将来自 DB 调用。
- 工作人员 W2 的所有任务应在 W1 完成后并行运行。
这是我的代码
dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')
t1 = BashOperator(
task_id='dummy_task',
bash_command='echo hi > /tmp/hi',
queue='W1_queue',
dag=dag)
get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"
db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)
cursor = connection.cursor()
cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
t = BashOperator(
task_id='script_test_'+str(i),
bash_command="{full_command} ".format(full_command=str(record[0])),
queue=str(record[1]),
dag=dag)
t.set_upstream(t1)
i += 1
cursor.close()
connection.close()
但是,当我运行它时,W1 上的任务成功完成,但 W2 上的所有任务都失败了。在气流 UI 中,我可以看到它可以解决正确数量的任务(在本例中为 10 个),但是这 10 个任务中的每一个都失败了。
查看日志,我看到在 W2(在另一台机器上)上,airflow 找不到 db_creds.json 文件。
我不想将 DB creds 文件提供给 W2。
我的问题是在这种情况下如何动态创建气流任务? 基本上我想在气流服务器上运行一个数据库查询,并根据该查询的结果将任务分配给一个或多个工作人员。数据库将包含有关哪些引擎处于活动状态等的更新信息,我希望 DAG 反映这一点。从日志来看,看起来每个工作人员都在运行数据库查询。向每个工作人员提供对数据库的访问权限不是一种选择。
【问题讨论】:
-
W2 任务失败的原因是正在运行的任务必须存在于 DAG 中。当它被硬编码时,这不是问题,但是由于您正在动态创建任务,因此工作人员/调度程序/网络服务器都需要访问构建 DAG 所需的任何依赖项。在您的情况下,这是一个数据库连接。 @Viraj Parekh 的建议允许您将该依赖项转移到工作人员可以访问的气流变量。
-
@cwurtz - 作为答案添加了更新
标签: airflow directed-acyclic-graphs airflow-scheduler