【发布时间】:2023-01-13 03:13:17
【问题描述】:
我有一个问题,实际上我无法在 dagster 中解决。
我有以下配置:
我在第 1 步中从端点获取数据
第 2 步动态获取客户列表:
第 3 步是使用来自第 1 步的响应对来自第 2 步的每个客户进行并行更新的数据库。
在调用步骤 3 之前,我有一个函数用于为步骤 2 的每个客户端创建 DynamicOutput,名称为“parallelize_clients”,以便在调用它时并行化步骤 3 的更新过程,最后我有一个图表来加入操作.
@op()
def step_1_get_response():
return {'exemple': 'data'}
@op()
def step_2_get_client_list():
return ['client_1', 'client_2', 'client_3'] #the number of customers is dynamic.
@op(out=DynamicOut())
def parallelize_clients(context, client_list):
for client in client_list:
yield DynamicOutput(client, mapping_key=str(client))
@op()
def step_3_update_database_cliente(response, client):
...OPERATION UPDATE IN DATABASE CLIENT
@graph()
def job_exemple_graph():
response = step_1_get_response()
clients_list = step_2_get_client_list()
clients = parallelize_clients(clients_list)
#run the functions in parallel
clients.map(lambda client: step_3_update_database_cliente(response, client))
根据文档,@Op 在其依赖关系得到满足后立即启动,对于没有依赖关系的 Ops,它们会立即执行,而没有确切的执行顺序。示例:我的 step1 和 step2 没有依赖关系,因此两者自动并行运行。客户端返回后,执行“parallelize_clients()”函数,最后,我在图中有一个映射,根据客户端的数量动态创建多个执行(DynamicOutput)
到目前为止它有效,一切都很好。这就是问题所在。我只需要在step3完全完成后才执行一个特定的功能,并且由于它是动态创建的,因此会并行生成多个执行,但是,我无法控制仅在所有这些并行执行完成后才执行一个功能。
在图中,我试图在最后调用操作“exemplolasststep() step_4”,但是,步骤 4 与“step1”和“step2”一起执行,我真的希望 step4 仅在 step3 之后执行,但是不是我能以某种方式让它工作。有人可以帮我吗?
我试图用
@op(ins={"start": In(Nothing)})
def step_4():
pass
在图中,调用操作时,我尝试在 step_4() 函数调用中执行 map 调用;例子
@graph()
def job_exemple_graph():
response = step_1_get_response()
clients_list = step_2_get_client_list()
clients = parallelize_clients(clients_list)
#run the functions in parallel
step_4(start=clients.map(lambda client: step_3_update_database_cliente(response, client)))
我也尝试过其他方法,但无济于事。
【问题讨论】:
标签: dagster